Skip to content

Commit

Permalink
[Improve][Connector-V2] Add pre-check for table enable cdc (#8152)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Nov 29, 2024
1 parent ea4234a commit 9a5da78
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public interface JdbcDataSourceDialect extends DataSourceDialect<JdbcSourceConfi
@Override
List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig);

default void checkAllTablesEnabledCapture(JdbcConnection jdbcConnection, List<TableId> tableIds)
throws SQLException {}

/**
* Creates and opens a new {@link JdbcConnection} backing connection pool.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ServerInfo;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
Expand Down Expand Up @@ -103,13 +104,32 @@ public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) {
PostgresSourceConfig postgresSourceConfig = (PostgresSourceConfig) sourceConfig;
try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) {
return TableDiscoveryUtils.listTables(
jdbcConnection, postgresSourceConfig.getTableFilters());
List<TableId> tables =
TableDiscoveryUtils.listTables(
jdbcConnection, postgresSourceConfig.getTableFilters());
this.checkAllTablesEnabledCapture(jdbcConnection, tables);
return tables;
} catch (SQLException e) {
throw new SeaTunnelException("Error to discover tables: " + e.getMessage(), e);
}
}

@Override
public void checkAllTablesEnabledCapture(JdbcConnection jdbcConnection, List<TableId> tableIds)
throws SQLException {
PostgresConnection postgresConnection = (PostgresConnection) jdbcConnection;
for (TableId tableId : tableIds) {
ServerInfo.ReplicaIdentity replicaIdentity =
postgresConnection.readReplicaIdentityInfo(tableId);
if (!ServerInfo.ReplicaIdentity.FULL.equals(replicaIdentity)) {
throw new SeaTunnelException(
String.format(
"Table %s does not have a full replica identity, please execute: ALTER TABLE %s REPLICA IDENTITY FULL;",
tableId, tableId));
}
}
}

@Override
public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
if (postgresSchema == null) {
Expand Down Expand Up @@ -155,6 +175,12 @@ public FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBas
if (sourceSplitBase.isSnapshotSplit()) {
return new PostgresSnapshotFetchTask(sourceSplitBase.asSnapshotSplit());
} else {
try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) {
List<TableId> tables = sourceSplitBase.asIncrementalSplit().getTableIds();
this.checkAllTablesEnabledCapture(jdbcConnection, tables);
} catch (SQLException e) {
throw new SeaTunnelException("Error to check tables: " + e.getMessage(), e);
}
postgresWalFetchTask = new PostgresWalFetchTask(sourceSplitBase.asIncrementalSplit());
return postgresWalFetchTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.TableDiscoveryUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;

import io.debezium.connector.sqlserver.SqlServerChangeTable;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
Expand All @@ -46,6 +48,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/** The {@link JdbcDataSourceDialect} implementation for MySQL datasource. */
public class SqlServerDialect implements JdbcDataSourceDialect {
Expand Down Expand Up @@ -88,13 +92,37 @@ public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) {
SqlServerSourceConfig sqlServerSourceConfig = (SqlServerSourceConfig) sourceConfig;
try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) {
return TableDiscoveryUtils.listTables(
jdbcConnection, sqlServerSourceConfig.getTableFilters());
List<TableId> tables =
TableDiscoveryUtils.listTables(
jdbcConnection, sqlServerSourceConfig.getTableFilters());
this.checkAllTablesEnabledCapture(jdbcConnection, tables);
return tables;
} catch (SQLException e) {
throw new SeaTunnelException("Error to discover tables: " + e.getMessage(), e);
}
}

@Override
public void checkAllTablesEnabledCapture(JdbcConnection jdbcConnection, List<TableId> tableIds)
throws SQLException {
Map<String, List<TableId>> databases =
tableIds.stream()
.collect(Collectors.groupingBy(TableId::catalog, Collectors.toList()));
for (String database : databases.keySet()) {
Set<TableId> tables =
((SqlServerConnection) jdbcConnection)
.getChangeTables(database).stream()
.map(SqlServerChangeTable::getSourceTableId)
.collect(Collectors.toSet());
for (TableId tableId : databases.get(database)) {
if (!tables.contains(tableId)) {
throw new SeaTunnelException(
"Table " + tableId + " is not enabled for capture");
}
}
}
}

@Override
public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
if (sqlServerSchema == null) {
Expand All @@ -115,6 +143,12 @@ public FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBas
if (sourceSplitBase.isSnapshotSplit()) {
return new SqlServerSnapshotFetchTask(sourceSplitBase.asSnapshotSplit());
} else {
try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) {
List<TableId> tables = sourceSplitBase.asIncrementalSplit().getTableIds();
this.checkAllTablesEnabledCapture(jdbcConnection, tables);
} catch (SQLException e) {
throw new SeaTunnelException("Error to check tables: " + e.getMessage(), e);
}
return new SqlServerTransactionLogFetchTask(sourceSplitBase.asIncrementalSplit());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.seatunnel.connectors.seatunnel.cdc.postgres;

import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.PostgresDialect;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
Expand All @@ -29,6 +33,7 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,6 +44,8 @@
import org.testcontainers.utility.DockerImageName;

import com.google.common.collect.Lists;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
Expand All @@ -53,6 +60,7 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -594,6 +602,32 @@ public void testPostgresCdcCheckDataWithCustomPrimaryKey(TestContainer container
}
}

@Test
public void testDialectCheckDisabledCDCTable() throws SQLException {
JdbcSourceConfigFactory factory =
new PostgresSourceConfigFactory()
.hostname(POSTGRES_CONTAINER.getHost())
.port(5432)
.username("postgres")
.password("postgres")
.databaseList(POSTGRESQL_DATABASE);
PostgresDialect dialect =
new PostgresDialect((PostgresSourceConfigFactory) factory, Collections.emptyList());
try (JdbcConnection connection = dialect.openJdbcConnection(factory.create(0))) {
SeaTunnelException exception =
Assertions.assertThrows(
SeaTunnelException.class,
() ->
dialect.checkAllTablesEnabledCapture(
connection,
Collections.singletonList(
TableId.parse(SINK_TABLE_1))));
Assertions.assertEquals(
"Table sink_postgres_cdc_table_1 does not have a full replica identity, please execute: ALTER TABLE sink_postgres_cdc_table_1 REPLICA IDENTITY FULL;",
exception.getMessage());
}
}

private Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
POSTGRES_CONTAINER.getJdbcUrl(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.seatunnel.e2e.connector.cdc.sqlserver;

import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.config.SqlServerSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.SqlServerDialect;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
Expand All @@ -31,6 +35,7 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.MSSQLServerContainer;
Expand All @@ -39,6 +44,8 @@
import org.testcontainers.utility.DockerLoggerFactory;

import com.google.common.collect.Lists;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
Expand All @@ -52,6 +59,7 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -346,6 +354,33 @@ public void testSqlServerCDCMetadataTrans(TestContainer container) throws Interr
}
}

@Test
public void testDialectCheckDisabledCDCTable() throws SQLException {
initializeSqlServerTable("column_type_test");
JdbcSourceConfigFactory factory =
new SqlServerSourceConfigFactory()
.hostname(MSSQL_SERVER_CONTAINER.getHost())
.port(PORT)
.username("sa")
.password("Password!")
.databaseList("column_type_test");
SqlServerDialect dialect =
new SqlServerDialect(
(SqlServerSourceConfigFactory) factory, Collections.emptyList());
try (JdbcConnection connection = dialect.openJdbcConnection(factory.create(0))) {
SeaTunnelException exception =
Assertions.assertThrows(
SeaTunnelException.class,
() ->
dialect.checkAllTablesEnabledCapture(
connection,
Collections.singletonList(TableId.parse(SINK_TABLE))));
Assertions.assertEquals(
"Table column_type_test.dbo.full_types_sink is not enabled for capture",
exception.getMessage());
}
}

/**
* Executes a JDBC statement using the default jdbc config without autocommitting the
* connection.
Expand Down

0 comments on commit 9a5da78

Please sign in to comment.