Skip to content

Commit

Permalink
[Bug][Connector-v2] MongoDB CDC Set SeatunnelRow's tableId (#7935)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 authored Nov 6, 2024
1 parent 5089d8a commit f3970d6
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 76 deletions.
82 changes: 10 additions & 72 deletions docs/en/connector-v2/source/MongoDB-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,14 @@ For specific types in MongoDB, we use Extended JSON format to map them to Seatun
## Source Options
| Name | Type | Required | Default | Description |
| Name | Type | Required | Default | Description |
|------------------------------------|--------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| hosts | String | Yes | - | The comma-separated list of hostname and port pairs of the MongoDB servers. eg. `localhost:27017,localhost:27018` |
| username | String | No | - | Name of the database user to be used when connecting to MongoDB. |
| password | String | No | - | Password to be used when connecting to MongoDB. |
| database | List | Yes | - | Name of the database to watch for changes. If not set then all databases will be captured. The database also supports regular expressions to monitor multiple databases matching the regular expression. eg. `db1,db2`. |
| collection | List | Yes | - | Name of the collection in the database to watch for changes. If not set then all collections will be captured. The collection also supports regular expressions to monitor multiple collections matching fully-qualified collection identifiers. eg. `db1.coll1,db2.coll2`. |
| schema | | yes | - | The structure of the data, including field names and field types. |
| connection.options | String | No | - | The ampersand-separated connection options of MongoDB. eg. `replicaSet=test&connectTimeoutMS=300000`. |
| batch.size | Long | No | 1024 | The cursor batch size. |
| poll.max.batch.size | Enum | No | 1024 | Maximum number of change stream documents to include in a single batch when polling for new data. |
Expand Down Expand Up @@ -185,6 +186,14 @@ source {
collection = ["inventory.products"]
username = stuser
password = stpw
schema = {
fields {
"_id" : string,
"name" : string,
"description" : string,
"weight" : string
}
}
}
}
Expand All @@ -204,76 +213,6 @@ sink {
}
```
## Multi-table Synchronization
The following example demonstrates how to create a data synchronization job that read the cdc data of multiple library tables mongodb and prints it on the local client:
```hocon
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
MongoDB-CDC {
hosts = "mongo0:27017"
database = ["inventory","crm"]
collection = ["inventory.products","crm.test"]
username = stuser
password = stpw
}
}
# Console printing of the read Mongodb data
sink {
Console {
parallelism = 1
}
}
```
### Tips:
> 1.The cdc synchronization of multiple library tables cannot specify the schema, and can only output json data downstream.
> This is because MongoDB does not provide metadata information for querying, so if you want to support multiple tables, all tables can only be read as one structure.
## Regular Expression Matching for Multiple Tables
The following example demonstrates how to create a data synchronization job that through regular expression read the data of multiple library tables mongodb and prints it on the local client:
| Matching example | Expressions | | Describe |
|------------------|-------------|---|----------------------------------------------------------------------------------------|
| Prefix matching | ^(test).* | | Match the database name or table name with the prefix test, such as test1, test2, etc. |
| Suffix matching | .*[p$] | | Match the database name or table name with the suffix p, such as cdcp, edcp, etc. |
```hocon
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
MongoDB-CDC {
hosts = "mongo0:27017"
# So this example is used (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[5-8]|tt),matching txc.tt、test2.test5.
database = ["(^(test).*|^(tpc).*|txc|.*[p$]|t{2})"]
collection = ["(t[5-8]|tt)"]
username = stuser
password = stpw
}
}
# Console printing of the read Mongodb data
sink {
Console {
parallelism = 1
}
}
```
## Format of real-time streaming data
Expand Down Expand Up @@ -309,4 +248,3 @@ sink {
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
Expand All @@ -31,11 +34,16 @@
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;

import com.google.auto.service.AutoService;

import java.io.Serializable;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT;

@AutoService(Factory.class)
public class MongodbIncrementalSourceFactory implements TableSourceFactory {
Expand All @@ -50,7 +58,8 @@ public OptionRule optionRule() {
.required(
MongodbSourceOptions.HOSTS,
MongodbSourceOptions.DATABASE,
MongodbSourceOptions.COLLECTION)
MongodbSourceOptions.COLLECTION,
TableSchemaOptions.SCHEMA)
.optional(
MongodbSourceOptions.USERNAME,
MongodbSourceOptions.PASSWORD,
Expand Down Expand Up @@ -79,9 +88,28 @@ public Class<? extends SeaTunnelSource> getSourceClass() {
public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
return () -> {
List<CatalogTable> catalogTables =
List<CatalogTable> configCatalog =
CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
List<String> collections = context.getOptions().get(MongodbSourceOptions.COLLECTION);
if (collections.size() != configCatalog.size()) {
throw new MongodbConnectorException(
ILLEGAL_ARGUMENT,
"The number of collections must be equal to the number of schema tables");
}
List<CatalogTable> catalogTables =
IntStream.range(0, configCatalog.size())
.mapToObj(
i -> {
CatalogTable catalogTable = configCatalog.get(i);
String fullName = collections.get(i);
TableIdentifier tableIdentifier =
TableIdentifier.of(
catalogTable.getCatalogName(),
TablePath.of(fullName));
return CatalogTable.of(tableIdentifier, catalogTable);
})
.collect(Collectors.toList());
SeaTunnelDataType<SeaTunnelRow> dataType =
CatalogTableUtil.convertToMultipleRowType(catalogTables);
return (SeaTunnelSource<T, SplitT, StateT>)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender;

import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;

import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.MapType;
Expand Down Expand Up @@ -62,10 +65,13 @@
import static org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT;
import static org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE;
import static org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.COLL_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DB_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DEFAULT_JSON_WRITER_SETTINGS;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DOCUMENT_KEY;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ENCODE_VALUE_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.FULL_DOCUMENT;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.NS_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument;
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;

Expand Down Expand Up @@ -169,8 +175,16 @@ private SeaTunnelRow extractRowData(
}

private String extractTableId(SourceRecord record) {
// TODO extract table id from record
return null;
Struct messageStruct = (Struct) record.value();
Struct nsStruct = (Struct) messageStruct.get(NS_FIELD);
String databaseName = nsStruct.getString(DB_FIELD);
String tableName = nsStruct.getString(COLL_FIELD);
return TablePath.of(databaseName, null, tableName).toString();
}

@VisibleForTesting
public String extractTableIdForTest(SourceRecord record) {
return extractTableId(record);
}

// -------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package mongodb.sender;

import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.MongoDBConnectorDeserializationSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils;

import org.apache.kafka.connect.source.SourceRecord;

import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.bson.BsonString;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.Map;

import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.COLL_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DB_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DOCUMENT_KEY;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.FULL_DOCUMENT;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ID_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.NS_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OPERATION_TYPE;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OPERATION_TYPE_INSERT;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SNAPSHOT_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SNAPSHOT_TRUE;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SOURCE_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.TS_MS_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.createSourceOffsetMap;

public class MongoDBConnectorDeserializationSchemaTest {

@Test
public void extractTableId() {
CatalogTable catalogTable =
CatalogTable.of(
TableIdentifier.of("catalog", "database", "table"),
TableSchema.builder()
.column(
PhysicalColumn.of(
"name1", BasicType.STRING_TYPE, 1L, true, null, ""))
.column(
PhysicalColumn.of(
"name1", BasicType.STRING_TYPE, 1L, true, null, ""))
.build(),
Collections.emptyMap(),
Collections.emptyList(),
"comment");
SeaTunnelDataType<SeaTunnelRow> dataType =
CatalogTableUtil.convertToMultipleRowType(Collections.singletonList(catalogTable));
MongoDBConnectorDeserializationSchema schema =
new MongoDBConnectorDeserializationSchema(dataType, dataType);

// Build SourceRecord
Map<String, String> partitionMap =
MongodbRecordUtils.createPartitionMap("localhost:27017", "inventory", "products");

BsonDocument valueDocument =
new BsonDocument()
.append(
ID_FIELD,
new BsonDocument(ID_FIELD, new BsonInt64(10000000000001L)))
.append(OPERATION_TYPE, new BsonString(OPERATION_TYPE_INSERT))
.append(
NS_FIELD,
new BsonDocument(DB_FIELD, new BsonString("inventory"))
.append(COLL_FIELD, new BsonString("products")))
.append(
DOCUMENT_KEY,
new BsonDocument(ID_FIELD, new BsonInt64(10000000000001L)))
.append(FULL_DOCUMENT, new BsonDocument())
.append(TS_MS_FIELD, new BsonInt64(System.currentTimeMillis()))
.append(
SOURCE_FIELD,
new BsonDocument(SNAPSHOT_FIELD, new BsonString(SNAPSHOT_TRUE))
.append(TS_MS_FIELD, new BsonInt64(0L)));
BsonDocument keyDocument = new BsonDocument(ID_FIELD, valueDocument.get(ID_FIELD));
SourceRecord sourceRecord =
MongodbRecordUtils.buildSourceRecord(
partitionMap,
createSourceOffsetMap(keyDocument.getDocument(ID_FIELD), true),
"inventory.products",
keyDocument,
valueDocument);
Object tableId = schema.extractTableIdForTest(sourceRecord);
Assertions.assertEquals("inventory.products", tableId);
}
}

0 comments on commit f3970d6

Please sign in to comment.