From e3d30baf691bee786ec760edd1473815cee27886 Mon Sep 17 00:00:00 2001 From: zhangdonghao Date: Thu, 7 Nov 2024 15:36:24 +0800 Subject: [PATCH 1/3] [Feature][Mongodb-CDC] support multi-table read --- docs/en/connector-v2/source/MongoDB-CDC.md | 69 ++++++- .../catalog/schema/TableSchemaOptions.java | 6 + .../MongodbIncrementalSourceFactory.java | 72 ++++++-- .../source/offset/ChangeStreamOffset.java | 5 +- .../converter/AbstractJdbcRowConverter.java | 20 +- .../src/test/java/mongodb/MongodbCDCIT.java | 171 ++++++++++-------- .../src/test/resources/ddl/inventory.js | 6 + .../src/test/resources/ddl/inventoryClean.js | 2 + .../src/test/resources/ddl/inventoryDDL.js | 15 +- .../src/test/resources/ddl/mongodb_cdc.sql | 6 + .../mongodb_multi_table_cdc_to_mysql.conf | 77 ++++++++ .../resources/mongodbcdc_metadata_trans.conf | 5 + .../test/resources/mongodbcdc_to_mysql.conf | 7 +- 13 files changed, 356 insertions(+), 105 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodb_multi_table_cdc_to_mysql.conf diff --git a/docs/en/connector-v2/source/MongoDB-CDC.md b/docs/en/connector-v2/source/MongoDB-CDC.md index d7e6c7e440f..3221fc94679 100644 --- a/docs/en/connector-v2/source/MongoDB-CDC.md +++ b/docs/en/connector-v2/source/MongoDB-CDC.md @@ -112,7 +112,8 @@ For specific types in MongoDB, we use Extended JSON format to map them to Seatun | password | String | No | - | Password to be used when connecting to MongoDB. | | database | List | Yes | - | Name of the database to watch for changes. If not set then all databases will be captured. The database also supports regular expressions to monitor multiple databases matching the regular expression. eg. `db1,db2`. | | collection | List | Yes | - | Name of the collection in the database to watch for changes. If not set then all collections will be captured. The collection also supports regular expressions to monitor multiple collections matching fully-qualified collection identifiers. eg. `db1.coll1,db2.coll2`. | -| schema | | yes | - | The structure of the data, including field names and field types. | +| schema | | no | - | The structure of the data, including field names and field types, use single table cdc. | +| table_configs | | no | - | The structure of the data, including field names and field types, use muliti table cdc. | | connection.options | String | No | - | The ampersand-separated connection options of MongoDB. eg. `replicaSet=test&connectTimeoutMS=300000`. | | batch.size | Long | No | 1024 | The cursor batch size. | | poll.max.batch.size | Enum | No | 1024 | Maximum number of change stream documents to include in a single batch when polling for new data. | @@ -126,6 +127,7 @@ For specific types in MongoDB, we use Extended JSON format to map them to Seatun > 1.If the collection changes at a slow pace, it is strongly recommended to set an appropriate value greater than 0 for the heartbeat.interval.ms parameter. When we recover a Seatunnel job from a checkpoint or savepoint, the heartbeat events can push the resumeToken forward to avoid its expiration.
> 2.MongoDB has a limit of 16MB for a single document. Change documents include additional information, so even if the original document is not larger than 15MB, the change document may exceed the 16MB limit, resulting in the termination of the Change Stream operation.
> 3.It is recommended to use immutable shard keys. In MongoDB, shard keys allow modifications after transactions are enabled, but changing the shard key can cause frequent shard migrations, resulting in additional performance overhead. Additionally, modifying the shard key can also cause the Update Lookup feature to become ineffective, leading to inconsistent results in CDC (Change Data Capture) scenarios.
+> 4.`schema` `table_configs` are mutually exclusive, and one must be configured at a time. ## How to Create a MongoDB CDC Data Synchronization Jobs @@ -149,6 +151,7 @@ source { username = stuser password = stpw schema = { + table = "inventory.products" fields { "_id" : string, "name" : string, @@ -187,6 +190,7 @@ source { username = stuser password = stpw schema = { + table = "inventory.products" fields { "_id" : string, "name" : string, @@ -213,6 +217,67 @@ sink { } ``` +## Multi-table Synchronization + +The following example demonstrates how to create a data synchronization job that read the cdc data of multiple library tables mongodb and prints it on the local client: + +```hocon +env { + # You can set engine configuration here + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + MongoDB-CDC { + hosts = "mongo0:27017" + database = ["inventory"] + collection = ["inventory.products", "inventory.orders"] + username = superuser + password = superpw + tables_configs = [ + { + schema { + table = "inventory.products" + primaryKey { + name = "id" + columnNames = ["_id"] + } + fields { + "_id" : string, + "name" : string, + "description" : string, + "weight" : string + } + } + }, + { + schema { + table = "inventory.orders" + primaryKey { + name = "order_number" + columnNames = ["order_number"] + } + fields { + "order_number" : int, + "order_date" : string, + "quantity" : int, + "product_id" : string + } + } + } + ] + } +} + +# Console printing of the read Mongodb data +sink { + Console { + parallelism = 1 + } +} +``` ## Format of real-time streaming data @@ -244,7 +309,7 @@ sink { "txnNumber" : , // If the change operation is executed in a multi-document transaction, this field and value are displayed, representing the transaction number "lsid" : { // Represents information related to the Session in which the transaction is located "id" : , - "uid" : + "uid" : } } ``` diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java index 794dbe833cb..a0b4460c0a5 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java @@ -55,6 +55,12 @@ public static class TableIdentifierOptions { .noDefaultValue() .withDescription("SeaTunnel Schema"); + public static final Option>> TABLES_CONFIGS = + Options.key("tables_configs") + .type(new TypeReference>>() {}) + .noDefaultValue() + .withDescription("The multiple table config list of SeaTunnel Schema"); + // We should use ColumnOptions instead of FieldOptions @Deprecated public static class FieldOptions { diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java index 5e3a95145c5..fbb3da4c164 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java @@ -17,18 +17,20 @@ package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb; +import org.apache.seatunnel.api.common.CommonOptions; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; -import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; +import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions; import org.apache.seatunnel.connectors.cdc.base.option.StartupMode; import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions; @@ -37,7 +39,9 @@ import com.google.auto.service.AutoService; import java.io.Serializable; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -56,8 +60,8 @@ public OptionRule optionRule() { .required( MongodbSourceOptions.HOSTS, MongodbSourceOptions.DATABASE, - MongodbSourceOptions.COLLECTION, - TableSchemaOptions.SCHEMA) + MongodbSourceOptions.COLLECTION) + .exclusive(TableSchemaOptions.SCHEMA, TableSchemaOptions.TABLES_CONFIGS) .optional( MongodbSourceOptions.USERNAME, MongodbSourceOptions.PASSWORD, @@ -86,30 +90,58 @@ public Class getSourceClass() { public TableSource createSource(TableSourceFactoryContext context) { return () -> { - List configCatalog = - CatalogTableUtil.getCatalogTables( - context.getOptions(), context.getClassLoader()); + List catalogTables = buildWithConfig(context.getOptions()); List collections = context.getOptions().get(MongodbSourceOptions.COLLECTION); - if (collections.size() != configCatalog.size()) { + if (collections.size() != catalogTables.size()) { throw new MongodbConnectorException( ILLEGAL_ARGUMENT, "The number of collections must be equal to the number of schema tables"); } - List catalogTables = - IntStream.range(0, configCatalog.size()) - .mapToObj( - i -> { - CatalogTable catalogTable = configCatalog.get(i); - String fullName = collections.get(i); - TableIdentifier tableIdentifier = - TableIdentifier.of( - catalogTable.getCatalogName(), - TablePath.of(fullName)); - return CatalogTable.of(tableIdentifier, catalogTable); - }) - .collect(Collectors.toList()); + IntStream.range(0, catalogTables.size()) + .forEach( + i -> { + CatalogTable catalogTable = catalogTables.get(i); + String collect = collections.get(i); + String fullName = catalogTable.getTablePath().getFullName(); + if (fullName.equals(TablePath.DEFAULT.getFullName()) + && !collections.contains(TablePath.DEFAULT.getFullName())) { + throw new MongodbConnectorException( + ILLEGAL_ARGUMENT, + "The `schema` or `table_configs` configuration is incorrect, Please check the configuration."); + } + if (!fullName.equals(collect)) { + throw new MongodbConnectorException( + ILLEGAL_ARGUMENT, + "The collection name must be consistent with the schema table name, please configure in the order of `collection`"); + } + }); return (SeaTunnelSource) new MongodbIncrementalSource<>(context.getOptions(), catalogTables); }; } + + private List buildWithConfig(ReadonlyConfig config) { + String factoryId = config.get(CommonOptions.PLUGIN_NAME).replace("-CDC", ""); + Map schemaMap = config.get(TableSchemaOptions.SCHEMA); + if (schemaMap != null) { + if (schemaMap.isEmpty()) { + throw new SeaTunnelException("Schema config can not be empty"); + } + CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(factoryId, config); + return Collections.singletonList(catalogTable); + } + List> schemaMaps = config.get(TableSchemaOptions.TABLES_CONFIGS); + if (schemaMaps != null) { + if (schemaMaps.isEmpty()) { + throw new SeaTunnelException("tables_configs can not be empty"); + } + return schemaMaps.stream() + .map( + map -> + CatalogTableUtil.buildWithConfig( + factoryId, ReadonlyConfig.fromMap(map))) + .collect(Collectors.toList()); + } + return Collections.emptyList(); + } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/offset/ChangeStreamOffset.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/offset/ChangeStreamOffset.java index 35acf43bbac..2096fe7a6b2 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/offset/ChangeStreamOffset.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/offset/ChangeStreamOffset.java @@ -67,7 +67,10 @@ public ChangeStreamOffset(BsonTimestamp timestamp) { } public BsonTimestamp getTimestamp() { - long timestamp = Long.parseLong(offset.get(TIMESTAMP_FIELD)); + long timestamp = System.currentTimeMillis(); + if (offset.get(TIMESTAMP_FIELD) != null) { + timestamp = Long.parseLong(offset.get(TIMESTAMP_FIELD)); + } return new BsonTimestamp(timestamp); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java index 691de6b77ce..90fcdcedce9 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java @@ -39,6 +39,7 @@ import java.sql.SQLException; import java.sql.Time; import java.sql.Timestamp; +import java.sql.Types; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; @@ -188,10 +189,11 @@ public PreparedStatement toExternal( throws SQLException { SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType(); for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) { + Object fieldValue = null; try { SeaTunnelDataType seaTunnelDataType = rowType.getFieldType(fieldIndex); int statementIndex = fieldIndex + 1; - Object fieldValue = row.getField(fieldIndex); + fieldValue = row.getField(fieldIndex); if (fieldValue == null) { statement.setObject(statementIndex, null); continue; @@ -228,28 +230,27 @@ public PreparedStatement toExternal( break; case DATE: LocalDate localDate = (LocalDate) row.getField(fieldIndex); - statement.setDate(statementIndex, java.sql.Date.valueOf(localDate)); + statement.setDate(statementIndex, Date.valueOf(localDate)); break; case TIME: writeTime(statement, statementIndex, (LocalTime) row.getField(fieldIndex)); break; case TIMESTAMP: LocalDateTime localDateTime = (LocalDateTime) row.getField(fieldIndex); - statement.setTimestamp( - statementIndex, java.sql.Timestamp.valueOf(localDateTime)); + statement.setTimestamp(statementIndex, Timestamp.valueOf(localDateTime)); break; case BYTES: statement.setBytes(statementIndex, (byte[]) row.getField(fieldIndex)); break; case NULL: - statement.setNull(statementIndex, java.sql.Types.NULL); + statement.setNull(statementIndex, Types.NULL); break; case ARRAY: SeaTunnelDataType elementType = ((ArrayType) seaTunnelDataType).getElementType(); Object[] array = (Object[]) row.getField(fieldIndex); if (array == null) { - statement.setNull(statementIndex, java.sql.Types.ARRAY); + statement.setNull(statementIndex, Types.ARRAY); break; } if (SqlType.TINYINT.equals(elementType.getSqlType())) { @@ -272,7 +273,10 @@ public PreparedStatement toExternal( } catch (Exception e) { throw new JdbcConnectorException( JdbcConnectorErrorCode.DATA_TYPE_CAST_FAILED, - "error field:" + rowType.getFieldNames()[fieldIndex], + "error field:" + + rowType.getFieldNames()[fieldIndex] + + "error value:" + + fieldValue, e); } } @@ -281,6 +285,6 @@ public PreparedStatement toExternal( protected void writeTime(PreparedStatement statement, int index, LocalTime time) throws SQLException { - statement.setTime(index, java.sql.Time.valueOf(time)); + statement.setTime(index, Time.valueOf(time)); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java index 7cf18c80322..873c5e27d0a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java @@ -30,6 +30,7 @@ import org.apache.seatunnel.e2e.common.util.JobIdGenerator; import org.bson.Document; +import org.bson.types.ObjectId; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -47,14 +48,16 @@ import lombok.extern.slf4j.Slf4j; import java.io.IOException; +import java.math.BigDecimal; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -75,7 +78,8 @@ public class MongodbCDCIT extends TestSuiteBase implements TestResource { // mongodb protected static final String MONGODB_DATABASE = "inventory"; - protected static final String MONGODB_COLLECTION = "products"; + protected static final String MONGODB_COLLECTION_1 = "products"; + protected static final String MONGODB_COLLECTION_2 = "orders"; protected MongoDBContainer mongodbContainer; protected MongoClient client; @@ -93,7 +97,10 @@ public class MongodbCDCIT extends TestSuiteBase implements TestResource { private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(); // mysql sink table query sql - private static final String SINK_SQL = "select name,description,weight from products"; + private static final String SINK_SQL_PRODUCTS = "select name,description,weight from products"; + + private static final String SINK_SQL_ORDERS = + "select order_number,order_date,quantity,product_id from orders order by order_number asc"; private static final String MYSQL_DRIVER_JAR = "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar"; @@ -163,70 +170,38 @@ public void testMongodbCdcToMysqlCheckDataE2e(TestContainer container) } return null; }); - await().atMost(60000, TimeUnit.MILLISECONDS) - .untilAsserted( - () -> { - Assertions.assertIterableEquals( - readMongodbData().stream() - .peek(e -> e.remove("_id")) - .map(Document::entrySet) - .map(Set::stream) - .map( - entryStream -> - entryStream - .map(Map.Entry::getValue) - .collect( - Collectors.toCollection( - ArrayList - ::new))) - .collect(Collectors.toList()), - querySql()); - }); - + TimeUnit.SECONDS.sleep(10); // insert update delete upsertDeleteSourceTable(); + assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS); - await().atMost(60000, TimeUnit.MILLISECONDS) - .untilAsserted( - () -> { - Assertions.assertIterableEquals( - readMongodbData().stream() - .peek(e -> e.remove("_id")) - .map(Document::entrySet) - .map(Set::stream) - .map( - entryStream -> - entryStream - .map(Map.Entry::getValue) - .collect( - Collectors.toCollection( - ArrayList - ::new))) - .collect(Collectors.toList()), - querySql()); - }); + cleanSourceTable(); + assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS); + } + @TestTemplate + public void testMongodbCdcMultiTableToMysqlCheckDataE2e(TestContainer container) + throws InterruptedException { cleanSourceTable(); + CompletableFuture.supplyAsync( + () -> { + try { + container.executeJob("/mongodb_multi_table_cdc_to_mysql.conf"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(); + } + return null; + }); + TimeUnit.SECONDS.sleep(10); + // insert update delete + upsertDeleteSourceTable(); + assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS); + assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS); - await().atMost(60000, TimeUnit.MILLISECONDS) - .untilAsserted( - () -> { - Assertions.assertIterableEquals( - readMongodbData().stream() - .peek(e -> e.remove("_id")) - .map(Document::entrySet) - .map(Set::stream) - .map( - entryStream -> - entryStream - .map(Map.Entry::getValue) - .collect( - Collectors.toCollection( - ArrayList - ::new))) - .collect(Collectors.toList()), - querySql()); - }); + cleanSourceTable(); + assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS); + assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS); } @TestTemplate @@ -268,6 +243,39 @@ public void testMongodbCdcMetadataTrans(TestContainer container) throws Interrup } } + private void assertionsSourceAndSink(String mongodbCollection, String sinkMysqlQuery) { + List> expected = + readMongodbData(mongodbCollection).stream() + .peek(e -> e.remove("_id")) + .map(Document::entrySet) + .map(Set::stream) + .map( + entryStream -> + entryStream + .map( + entry -> { + Object value = entry.getValue(); + if (value instanceof Number) { + return new BigDecimal( + value.toString()) + .intValue(); + } + if (value instanceof ObjectId) { + return ((ObjectId) value) + .toString(); + } + return value; + }) + .collect(Collectors.toCollection(ArrayList::new))) + .collect(Collectors.toList()); + log.info("Print mongodb source data: \n{}", expected); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals(expected, querySql(sinkMysqlQuery)); + }); + } + private Connection getJdbcConnection() throws SQLException { return DriverManager.getConnection( MYSQL_CONTAINER.getJdbcUrl(), @@ -275,10 +283,9 @@ private Connection getJdbcConnection() throws SQLException { MYSQL_CONTAINER.getPassword()); } - private List> querySql() { + private List> querySql(String querySql) { try (Connection connection = getJdbcConnection(); - ResultSet resultSet = - connection.createStatement().executeQuery(MongodbCDCIT.SINK_SQL)) { + ResultSet resultSet = connection.createStatement().executeQuery(querySql)) { List> result = new ArrayList<>(); int columnCount = resultSet.getMetaData().getColumnCount(); while (resultSet.next()) { @@ -286,21 +293,45 @@ private List> querySql() { for (int i = 1; i <= columnCount; i++) { objects.add(resultSet.getObject(i)); } - log.info("Print mysql sink data:" + objects); + log.info("Print mysql sink data: {} ", objects); result.add(objects); } + log.info("============================= mysql data ================================"); return result; } catch (SQLException e) { throw new RuntimeException(e); } } + private void truncateMysqlTable(String tableName) { + String checkTableExistsSql = + "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = ? AND table_name = ?"; + String truncateTableSql = String.format("TRUNCATE TABLE %s", tableName); + + try (Connection connection = getJdbcConnection(); + PreparedStatement checkStmt = connection.prepareStatement(checkTableExistsSql)) { + checkStmt.setString(1, MYSQL_DATABASE); + checkStmt.setString(2, tableName); + try (ResultSet rs = checkStmt.executeQuery()) { + if (rs.next() && rs.getInt(1) > 0) { + try (Statement truncateStmt = connection.createStatement()) { + truncateStmt.executeUpdate(truncateTableSql); + } + } + } + } catch (SQLException e) { + throw new RuntimeException("Error checking if table exists: " + tableName, e); + } + } + private void upsertDeleteSourceTable() { mongodbContainer.executeCommandFileInDatabase("inventoryDDL", MONGODB_DATABASE); } private void cleanSourceTable() { mongodbContainer.executeCommandFileInDatabase("inventoryClean", MONGODB_DATABASE); + truncateMysqlTable(MONGODB_COLLECTION_1); + truncateMysqlTable(MONGODB_COLLECTION_2); } public void initConnection() { @@ -309,17 +340,13 @@ public void initConnection() { String url = String.format( "mongodb://%s:%s@%s:%d/%s?authSource=admin", - "superuser", - "superpw", - ipAddress, - port, - MONGODB_DATABASE + "." + MONGODB_COLLECTION); + "superuser", "superpw", ipAddress, port, MONGODB_DATABASE); client = MongoClients.create(url); } - protected List readMongodbData() { + protected List readMongodbData(String collection) { MongoCollection sinkTable = - client.getDatabase(MONGODB_DATABASE).getCollection(MongodbCDCIT.MONGODB_COLLECTION); + client.getDatabase(MONGODB_DATABASE).getCollection(collection); // If the cursor has been traversed, it will automatically close without explicitly closing. MongoCursor cursor = sinkTable.find().sort(Sorts.ascending("_id")).cursor(); List documents = new ArrayList<>(); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventory.js b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventory.js index c834ec8a2ce..cd4d6cd0abb 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventory.js +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventory.js @@ -22,3 +22,9 @@ db.getCollection('products').insertOne({"_id": ObjectId("10000000000000000000010 db.getCollection('products').insertOne({"_id": ObjectId("100000000000000000000107"), "name": "rocks", "description": "box of assorted rocks", "weight": "53"}); db.getCollection('products').insertOne({"_id": ObjectId("100000000000000000000108"), "name": "jacket", "description": "water resistent black wind breaker", "weight": "1"}); + +db.getCollection('orders').insertOne({"_id": ObjectId("100000000000000000000101"),"order_number": 102482, "order_date": "2023-11-12", "quantity": 2 , "product_id": ObjectId("100000000000000000000101")}); +db.getCollection('orders').insertOne({"_id": ObjectId("100000000000000000000102"),"order_number": 102483, "order_date": "2023-11-13", "quantity": 5 , "product_id": ObjectId("100000000000000000000102")}); +db.getCollection('orders').insertOne({"_id": ObjectId("100000000000000000000103"),"order_number": 102484, "order_date": "2023-11-14", "quantity": 6 , "product_id": ObjectId("100000000000000000000103")}); +db.getCollection('orders').insertOne({"_id": ObjectId("100000000000000000000104"),"order_number": 102485, "order_date": "2023-11-15", "quantity": 9 , "product_id": ObjectId("100000000000000000000104")}); +db.getCollection('orders').insertOne({"_id": ObjectId("100000000000000000000105"),"order_number": 102486, "order_date": "2023-11-16", "quantity": 8 , "product_id": ObjectId("100000000000000000000105")}); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryClean.js b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryClean.js index fbbb0ea0df6..2b99f986181 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryClean.js +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryClean.js @@ -14,3 +14,5 @@ // limitations under the License. db.getCollection('products').deleteMany({}) + +db.getCollection('orders').deleteMany({}) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryDDL.js b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryDDL.js index db05f5f59ff..9c4059ee1d7 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryDDL.js +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryDDL.js @@ -29,4 +29,17 @@ db.getCollection('products').deleteOne({"_id": ObjectId("10000000000000000000010 db.getCollection('products').deleteOne({"name": "car battery"}); db.getCollection('products').deleteOne({"name": "12-pack drill bits"}); db.getCollection('products').deleteOne({"name": "hammer", "weight": "875"}); -db.getCollection('products').deleteOne({"name": "jacket"}); \ No newline at end of file +db.getCollection('products').deleteOne({"name": "jacket"}); + + +db.getCollection('orders').insertOne({"_id": ObjectId("100000000000000000000106"),"order_number": 102487, "order_date": "2023-11-12", "quantity": 2 , "product_id": ObjectId("100000000000000000000113")}); +db.getCollection('orders').insertOne({"_id": ObjectId("100000000000000000000107"),"order_number": 102488, "order_date": "2023-11-13", "quantity": 5 , "product_id": ObjectId("100000000000000000000112")}); +db.getCollection('orders').insertOne({"_id": ObjectId("100000000000000000000108"),"order_number": 102489, "order_date": "2023-11-14", "quantity": 6 , "product_id": ObjectId("100000000000000000000111")}); +db.getCollection('orders').insertOne({"_id": ObjectId("100000000000000000000109"),"order_number": 102490, "order_date": "2023-11-15", "quantity": 9 , "product_id": ObjectId("100000000000000000000110")}); +db.getCollection('orders').insertOne({"_id": ObjectId("100000000000000000000110"),"order_number": 102491, "order_date": "2023-11-16", "quantity": 8 , "product_id": ObjectId("100000000000000000000109")}); + +db.getCollection('orders').updateOne({"order_number": 102484}, {$set: {"quantity": 99}}); + +db.getCollection('orders').deleteOne({"order_number": 102487}); +db.getCollection('orders').deleteOne({"order_number": 102488}); +db.getCollection('orders').deleteOne({"order_number": 102489}); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/mongodb_cdc.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/mongodb_cdc.sql index cc7a619af69..bf25364e016 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/mongodb_cdc.sql +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/mongodb_cdc.sql @@ -30,3 +30,9 @@ CREATE TABLE products ( weight VARCHAR(255) ); +CREATE TABLE orders ( + order_number INT NOT NULL PRIMARY KEY, + order_date VARCHAR(20) NOT NULL, + quantity INT NOT NULL, + product_id VARCHAR(512) NOT NULL +); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodb_multi_table_cdc_to_mysql.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodb_multi_table_cdc_to_mysql.conf new file mode 100644 index 00000000000..528e712e612 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodb_multi_table_cdc_to_mysql.conf @@ -0,0 +1,77 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + MongoDB-CDC { + hosts = "mongo0:27017" + database = ["inventory"] + collection = ["inventory.products","inventory.orders"] + username = superuser + password = superpw + tables_configs = [ + { + schema { + table = "inventory.products" + primaryKey { + name = "id" + columnNames = ["_id"] + } + fields { + "_id" : string, + "name" : string, + "description" : string, + "weight" : string + } + } + }, + { + schema { + table = "inventory.orders" + primaryKey { + name = "order_number" + columnNames = ["order_number"] + } + fields { + "order_number" : int, + "order_date" : string, + "quantity" : int, + "product_id" : string + } + } + } + ] + } +} + +sink { + jdbc { + url = "jdbc:mysql://mysql_e2e:3306/mongodb_cdc" + driver = "com.mysql.cj.jdbc.Driver" + user = "st_user" + password = "seatunnel" + generate_sink_sql = true + database = mongodb_cdc + table = "${table_name}" + primary_keys = ["${primary_key}"] + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_metadata_trans.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_metadata_trans.conf index bc6475359c4..e4e8868c8a4 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_metadata_trans.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_metadata_trans.conf @@ -29,6 +29,11 @@ source { username = superuser password = superpw schema = { + table = "inventory.products" + primaryKey { + name = "id" + columnNames = ["_id"] + } fields { "_id": string, "name": string, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf index f4e3c457355..d4b72afddb7 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf @@ -29,6 +29,11 @@ source { username = superuser password = superpw schema = { + table = "inventory.products" + primaryKey { + name = "id" + columnNames = ["_id"] + } fields { "_id": string, "name": string, @@ -51,4 +56,4 @@ sink { table = products primary_keys = ["_id"] } -} \ No newline at end of file +} From ad9f8b29599ccf49264bc7a71a5daa6e8480377c Mon Sep 17 00:00:00 2001 From: zhangdonghao Date: Tue, 19 Nov 2024 15:04:38 +0800 Subject: [PATCH 2/3] [Feature][Mongodb-CDC] support multi-table read --- .../src/test/java/mongodb/MongodbCDCIT.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java index 873c5e27d0a..cbdf35954f8 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java @@ -173,13 +173,20 @@ public void testMongodbCdcToMysqlCheckDataE2e(TestContainer container) TimeUnit.SECONDS.sleep(10); // insert update delete upsertDeleteSourceTable(); + TimeUnit.SECONDS.sleep(20); assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS); cleanSourceTable(); + TimeUnit.SECONDS.sleep(20); assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS); } @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = + "This case requires obtaining the task health status and manually canceling the canceled task, which is currently only supported by the zeta engine.") public void testMongodbCdcMultiTableToMysqlCheckDataE2e(TestContainer container) throws InterruptedException { cleanSourceTable(); @@ -196,10 +203,12 @@ public void testMongodbCdcMultiTableToMysqlCheckDataE2e(TestContainer container) TimeUnit.SECONDS.sleep(10); // insert update delete upsertDeleteSourceTable(); + TimeUnit.SECONDS.sleep(20); assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS); assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS); cleanSourceTable(); + TimeUnit.SECONDS.sleep(20); assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS); assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS); } From b5661da63761d47b02cb8fca42d5f31249bc85bc Mon Sep 17 00:00:00 2001 From: zhangdonghao Date: Thu, 21 Nov 2024 18:43:33 +0800 Subject: [PATCH 3/3] [Feature][Mongodb-CDC] support multi-table read --- .../src/test/java/mongodb/MongodbCDCIT.java | 44 +++++++++++++++++-- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java index cbdf35954f8..d848d36a00f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java @@ -184,10 +184,9 @@ public void testMongodbCdcToMysqlCheckDataE2e(TestContainer container) @TestTemplate @DisabledOnContainer( value = {}, - type = {EngineType.SPARK, EngineType.FLINK}, - disabledReason = - "This case requires obtaining the task health status and manually canceling the canceled task, which is currently only supported by the zeta engine.") - public void testMongodbCdcMultiTableToMysqlCheckDataE2e(TestContainer container) + type = {EngineType.SPARK, EngineType.SEATUNNEL}, + disabledReason = "") + public void testMongodbCdcMultiTableToMysqlByFlinkDataE2e(TestContainer container) throws InterruptedException { cleanSourceTable(); CompletableFuture.supplyAsync( @@ -203,14 +202,51 @@ public void testMongodbCdcMultiTableToMysqlCheckDataE2e(TestContainer container) TimeUnit.SECONDS.sleep(10); // insert update delete upsertDeleteSourceTable(); + TimeUnit.SECONDS.sleep(30); + assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS); + assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS); + + cleanSourceTable(); TimeUnit.SECONDS.sleep(20); assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS); assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS); + } + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "") + public void testMongodbCdcMultiTableToMysqlByZetaE2E(TestContainer container) + throws InterruptedException { cleanSourceTable(); + Long jobId = JobIdGenerator.newJobId(); + CompletableFuture.supplyAsync( + () -> { + try { + container.executeJob( + "/mongodb_multi_table_cdc_to_mysql.conf", String.valueOf(jobId)); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(); + } + return null; + }); + TimeUnit.SECONDS.sleep(10); + // insert update delete + upsertDeleteSourceTable(); TimeUnit.SECONDS.sleep(20); + // cancel job + try { + Container.ExecResult cancelJobResult = container.cancelJob(String.valueOf(jobId)); + Assertions.assertEquals(0, cancelJobResult.getExitCode(), cancelJobResult.getStderr()); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS); assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS); + + cleanSourceTable(); } @TestTemplate