Skip to content

Commit

Permalink
[Feature][Mongodb-CDC] support multi-table read
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 committed Nov 20, 2024
1 parent 69cd4ae commit 539a903
Show file tree
Hide file tree
Showing 14 changed files with 365 additions and 110 deletions.
69 changes: 67 additions & 2 deletions docs/en/connector-v2/source/MongoDB-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ For specific types in MongoDB, we use Extended JSON format to map them to Seatun
| password | String | No | - | Password to be used when connecting to MongoDB. |
| database | List | Yes | - | Name of the database to watch for changes. If not set then all databases will be captured. The database also supports regular expressions to monitor multiple databases matching the regular expression. eg. `db1,db2`. |
| collection | List | Yes | - | Name of the collection in the database to watch for changes. If not set then all collections will be captured. The collection also supports regular expressions to monitor multiple collections matching fully-qualified collection identifiers. eg. `db1.coll1,db2.coll2`. |
| schema | | yes | - | The structure of the data, including field names and field types. |
| schema | | no | - | The structure of the data, including field names and field types, use single table cdc. |
| table_configs | | no | - | The structure of the data, including field names and field types, use muliti table cdc. |
| connection.options | String | No | - | The ampersand-separated connection options of MongoDB. eg. `replicaSet=test&connectTimeoutMS=300000`. |
| batch.size | Long | No | 1024 | The cursor batch size. |
| poll.max.batch.size | Enum | No | 1024 | Maximum number of change stream documents to include in a single batch when polling for new data. |
Expand All @@ -126,6 +127,7 @@ For specific types in MongoDB, we use Extended JSON format to map them to Seatun
> 1.If the collection changes at a slow pace, it is strongly recommended to set an appropriate value greater than 0 for the heartbeat.interval.ms parameter. When we recover a Seatunnel job from a checkpoint or savepoint, the heartbeat events can push the resumeToken forward to avoid its expiration.<br/>
> 2.MongoDB has a limit of 16MB for a single document. Change documents include additional information, so even if the original document is not larger than 15MB, the change document may exceed the 16MB limit, resulting in the termination of the Change Stream operation.<br/>
> 3.It is recommended to use immutable shard keys. In MongoDB, shard keys allow modifications after transactions are enabled, but changing the shard key can cause frequent shard migrations, resulting in additional performance overhead. Additionally, modifying the shard key can also cause the Update Lookup feature to become ineffective, leading to inconsistent results in CDC (Change Data Capture) scenarios.<br/>
> 4.`schema` `table_configs` are mutually exclusive, and one must be configured at a time.
## How to Create a MongoDB CDC Data Synchronization Jobs
Expand All @@ -149,6 +151,7 @@ source {
username = stuser
password = stpw
schema = {
table = "inventory.products"
fields {
"_id" : string,
"name" : string,
Expand Down Expand Up @@ -187,6 +190,7 @@ source {
username = stuser
password = stpw
schema = {
table = "inventory.products"
fields {
"_id" : string,
"name" : string,
Expand All @@ -213,6 +217,67 @@ sink {
}
```
## Multi-table Synchronization
The following example demonstrates how to create a data synchronization job that read the cdc data of multiple library tables mongodb and prints it on the local client:
```hocon
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
MongoDB-CDC {
hosts = "mongo0:27017"
database = ["inventory"]
collection = ["inventory.products", "inventory.orders"]
username = superuser
password = superpw
tables_configs = [
{
schema {
table = "inventory.products"
primaryKey {
name = "id"
columnNames = ["_id"]
}
fields {
"_id" : string,
"name" : string,
"description" : string,
"weight" : string
}
}
},
{
schema {
table = "inventory.orders"
primaryKey {
name = "order_number"
columnNames = ["order_number"]
}
fields {
"order_number" : int,
"order_date" : string,
"quantity" : int,
"product_id" : string
}
}
}
]
}
}
# Console printing of the read Mongodb data
sink {
Console {
parallelism = 1
}
}
```
## Format of real-time streaming data
Expand Down Expand Up @@ -244,7 +309,7 @@ sink {
"txnNumber" : <NumberLong>, // If the change operation is executed in a multi-document transaction, this field and value are displayed, representing the transaction number
"lsid" : { // Represents information related to the Session in which the transaction is located
"id" : <UUID>,
"uid" : <BinData>
"uid" : <BinData>
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ public static class TableIdentifierOptions {
.noDefaultValue()
.withDescription("SeaTunnel Schema");

public static final Option<List<Map<String, Object>>> TABLES_CONFIGS =
Options.key("tables_configs")
.type(new TypeReference<List<Map<String, Object>>>() {})
.noDefaultValue()
.withDescription("The multiple table config list of SeaTunnel Schema");

// We should use ColumnOptions instead of FieldOptions
@Deprecated
public static class FieldOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
<version>${debezium.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb.kafka</groupId>
<artifactId>mongo-kafka-connect</artifactId>
Expand All @@ -66,11 +71,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@

package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb;

import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions;
Expand All @@ -37,7 +39,9 @@
import com.google.auto.service.AutoService;

import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -56,8 +60,8 @@ public OptionRule optionRule() {
.required(
MongodbSourceOptions.HOSTS,
MongodbSourceOptions.DATABASE,
MongodbSourceOptions.COLLECTION,
TableSchemaOptions.SCHEMA)
MongodbSourceOptions.COLLECTION)
.exclusive(TableSchemaOptions.SCHEMA, TableSchemaOptions.TABLES_CONFIGS)
.optional(
MongodbSourceOptions.USERNAME,
MongodbSourceOptions.PASSWORD,
Expand Down Expand Up @@ -86,30 +90,58 @@ public Class<? extends SeaTunnelSource> getSourceClass() {
public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
return () -> {
List<CatalogTable> configCatalog =
CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
List<CatalogTable> catalogTables = buildWithConfig(context.getOptions());
List<String> collections = context.getOptions().get(MongodbSourceOptions.COLLECTION);
if (collections.size() != configCatalog.size()) {
if (collections.size() != catalogTables.size()) {
throw new MongodbConnectorException(
ILLEGAL_ARGUMENT,
"The number of collections must be equal to the number of schema tables");
}
List<CatalogTable> catalogTables =
IntStream.range(0, configCatalog.size())
.mapToObj(
i -> {
CatalogTable catalogTable = configCatalog.get(i);
String fullName = collections.get(i);
TableIdentifier tableIdentifier =
TableIdentifier.of(
catalogTable.getCatalogName(),
TablePath.of(fullName));
return CatalogTable.of(tableIdentifier, catalogTable);
})
.collect(Collectors.toList());
IntStream.range(0, catalogTables.size())
.forEach(
i -> {
CatalogTable catalogTable = catalogTables.get(i);
String collect = collections.get(i);
String fullName = catalogTable.getTablePath().getFullName();
if (fullName.equals(TablePath.DEFAULT.getFullName())
&& !collections.contains(TablePath.DEFAULT.getFullName())) {
throw new MongodbConnectorException(
ILLEGAL_ARGUMENT,
"The `schema` or `table_configs` configuration is incorrect, Please check the configuration.");
}
if (!fullName.equals(collect)) {
throw new MongodbConnectorException(
ILLEGAL_ARGUMENT,
"The collection name must be consistent with the schema table name, please configure in the order of `collection`");
}
});
return (SeaTunnelSource<T, SplitT, StateT>)
new MongodbIncrementalSource<>(context.getOptions(), catalogTables);
};
}

private List<CatalogTable> buildWithConfig(ReadonlyConfig config) {
String factoryId = config.get(CommonOptions.PLUGIN_NAME).replace("-CDC", "");
Map<String, Object> schemaMap = config.get(TableSchemaOptions.SCHEMA);
if (schemaMap != null) {
if (schemaMap.isEmpty()) {
throw new SeaTunnelException("Schema config can not be empty");
}
CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(factoryId, config);
return Collections.singletonList(catalogTable);
}
List<Map<String, Object>> schemaMaps = config.get(TableSchemaOptions.TABLES_CONFIGS);
if (schemaMaps != null) {
if (schemaMaps.isEmpty()) {
throw new SeaTunnelException("tables_configs can not be empty");
}
return schemaMaps.stream()
.map(
map ->
CatalogTableUtil.buildWithConfig(
factoryId, ReadonlyConfig.fromMap(map)))
.collect(Collectors.toList());
}
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ public ChangeStreamOffset(BsonTimestamp timestamp) {
}

public BsonTimestamp getTimestamp() {
long timestamp = Long.parseLong(offset.get(TIMESTAMP_FIELD));
long timestamp = System.currentTimeMillis();
if (offset.get(TIMESTAMP_FIELD) != null) {
timestamp = Long.parseLong(offset.get(TIMESTAMP_FIELD));
}
return new BsonTimestamp(timestamp);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
Expand Down Expand Up @@ -188,10 +189,11 @@ public PreparedStatement toExternal(
throws SQLException {
SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) {
Object fieldValue = null;
try {
SeaTunnelDataType<?> seaTunnelDataType = rowType.getFieldType(fieldIndex);
int statementIndex = fieldIndex + 1;
Object fieldValue = row.getField(fieldIndex);
fieldValue = row.getField(fieldIndex);
if (fieldValue == null) {
statement.setObject(statementIndex, null);
continue;
Expand Down Expand Up @@ -228,28 +230,27 @@ public PreparedStatement toExternal(
break;
case DATE:
LocalDate localDate = (LocalDate) row.getField(fieldIndex);
statement.setDate(statementIndex, java.sql.Date.valueOf(localDate));
statement.setDate(statementIndex, Date.valueOf(localDate));
break;
case TIME:
writeTime(statement, statementIndex, (LocalTime) row.getField(fieldIndex));
break;
case TIMESTAMP:
LocalDateTime localDateTime = (LocalDateTime) row.getField(fieldIndex);
statement.setTimestamp(
statementIndex, java.sql.Timestamp.valueOf(localDateTime));
statement.setTimestamp(statementIndex, Timestamp.valueOf(localDateTime));
break;
case BYTES:
statement.setBytes(statementIndex, (byte[]) row.getField(fieldIndex));
break;
case NULL:
statement.setNull(statementIndex, java.sql.Types.NULL);
statement.setNull(statementIndex, Types.NULL);
break;
case ARRAY:
SeaTunnelDataType elementType =
((ArrayType) seaTunnelDataType).getElementType();
Object[] array = (Object[]) row.getField(fieldIndex);
if (array == null) {
statement.setNull(statementIndex, java.sql.Types.ARRAY);
statement.setNull(statementIndex, Types.ARRAY);
break;
}
if (SqlType.TINYINT.equals(elementType.getSqlType())) {
Expand All @@ -272,7 +273,10 @@ public PreparedStatement toExternal(
} catch (Exception e) {
throw new JdbcConnectorException(
JdbcConnectorErrorCode.DATA_TYPE_CAST_FAILED,
"error field:" + rowType.getFieldNames()[fieldIndex],
"error field:"
+ rowType.getFieldNames()[fieldIndex]
+ "error value:"
+ fieldValue,
e);
}
}
Expand All @@ -281,6 +285,6 @@ public PreparedStatement toExternal(

protected void writeTime(PreparedStatement statement, int index, LocalTime time)
throws SQLException {
statement.setTime(index, java.sql.Time.valueOf(time));
statement.setTime(index, Time.valueOf(time));
}
}
Loading

0 comments on commit 539a903

Please sign in to comment.