Skip to content

Commit

Permalink
[Improve][Jdbc] Refactor ddl change
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 committed Nov 25, 2024
1 parent e1010dc commit cf0a125
Show file tree
Hide file tree
Showing 7 changed files with 352 additions and 366 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.converter.TypeConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
Expand Down Expand Up @@ -70,6 +71,12 @@ public JdbcRowConverter getRowConverter() {
return new MysqlJdbcRowConverter();
}

@Override
public TypeConverter<BasicTypeDefine> getTypeConverter() {
TypeConverter typeConverter = MySqlTypeConverter.DEFAULT_INSTANCE;
return typeConverter;
}

@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new MySqlTypeMapper();
Expand Down Expand Up @@ -228,13 +235,9 @@ public Long approximateRowCntStatement(Connection connection, JdbcSourceTable ta
}

@Override
public String decorateWithComment(
String tableName, String basicSql, BasicTypeDefine typeBasicTypeDefine) {
public boolean supportDefaultValue(BasicTypeDefine typeBasicTypeDefine) {
MysqlType nativeType = (MysqlType) typeBasicTypeDefine.getNativeType();
if (NOT_SUPPORTED_DEFAULT_VALUES.contains(nativeType)) {
return basicSql;
}
return JdbcDialect.super.decorateWithComment(tableName, basicSql, typeBasicTypeDefine);
return !(NOT_SUPPORTED_DEFAULT_VALUES.contains(nativeType));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public class OceanBaseMySqlTypeConverter
private static final String VECTOR_TYPE_NAME = "";
private static final String VECTOR_NAME = "VECTOR";

public static final OceanBaseMySqlTypeConverter INSTANCE = new OceanBaseMySqlTypeConverter();

@Override
public String identifier() {
return DatabaseIdentifier.OCENABASE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.converter.TypeConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
Expand Down Expand Up @@ -73,6 +74,12 @@ public JdbcRowConverter getRowConverter() {
return new OceanBaseMysqlJdbcRowConverter();
}

@Override
public TypeConverter<BasicTypeDefine> getTypeConverter() {
TypeConverter typeConverter = OceanBaseMySqlTypeConverter.INSTANCE;
return typeConverter;
}

@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new OceanBaseMySqlTypeMapper();
Expand Down Expand Up @@ -231,13 +238,9 @@ public Long approximateRowCntStatement(Connection connection, JdbcSourceTable ta
}

@Override
public String decorateWithComment(
String tableName, String basicSql, BasicTypeDefine typeBasicTypeDefine) {
public boolean supportDefaultValue(BasicTypeDefine typeBasicTypeDefine) {
OceanBaseMysqlType nativeType = (OceanBaseMysqlType) typeBasicTypeDefine.getNativeType();
if (NOT_SUPPORTED_DEFAULT_VALUES.contains(nativeType)) {
return basicSql;
}
return JdbcDialect.super.decorateWithComment(tableName, basicSql, typeBasicTypeDefine);
return !(NOT_SUPPORTED_DEFAULT_VALUES.contains(nativeType));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;

import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.converter.TypeConverter;
import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
Expand Down Expand Up @@ -64,6 +70,11 @@ public JdbcRowConverter getRowConverter() {
return new OracleJdbcRowConverter();
}

@Override
public TypeConverter<BasicTypeDefine> getTypeConverter() {
return OracleTypeConverter.INSTANCE;
}

@Override
public String hashModForField(String fieldName, int mod) {
return "MOD(ORA_HASH(" + quoteIdentifier(fieldName) + ")," + mod + ")";
Expand Down Expand Up @@ -329,17 +340,146 @@ public Object[] sampleDataFromColumn(
}

@Override
public String decorateWithComment(
String tableName, String basicSql, BasicTypeDefine typeBasicTypeDefine) {
String comment = typeBasicTypeDefine.getComment();
StringBuilder sql = new StringBuilder(basicSql);
if (StringUtils.isNotBlank(comment)) {
String commentSql =
String.format(
"COMMENT ON COLUMN %s.%s IS '%s'",
tableName, quoteIdentifier(typeBasicTypeDefine.getName()), comment);
sql.append(";\n").append(commentSql);
public void applySchemaChange(
Connection connection, TablePath tablePath, AlterTableAddColumnEvent event)
throws SQLException {
List<String> ddlSQL = new ArrayList<>();
ddlSQL.add(buildUpdateColumnSQL(connection, tablePath, event));

if (event.getColumn().getComment() != null) {
ddlSQL.add(buildUpdateColumnCommentSQL(tablePath, event.getColumn()));
}

try (Statement statement = connection.createStatement()) {
for (String sql : ddlSQL) {
log.info("Executing add column SQL: " + sql);
statement.execute(sql);
}
}
}

@Override
public void applySchemaChange(
Connection connection, TablePath tablePath, AlterTableChangeColumnEvent event)
throws SQLException {
List<String> ddlSQL = new ArrayList<>();
if (event.getOldColumn() != null
&& !(event.getColumn().getName().equals(event.getOldColumn()))) {
StringBuilder sqlBuilder =
new StringBuilder()
.append("ALTER TABLE ")
.append(tableIdentifier(tablePath))
.append(" RENAME COLUMN ")
.append(quoteIdentifier(event.getOldColumn()))
.append(" TO ")
.append(quoteIdentifier(event.getColumn().getName()));
ddlSQL.add(sqlBuilder.toString());
}

try (Statement statement = connection.createStatement()) {
for (String sql : ddlSQL) {
log.info("Executing change column SQL: " + sql);
statement.execute(sql);
}
}

if (event.getColumn().getDataType() != null) {
applySchemaChange(
connection,
tablePath,
AlterTableModifyColumnEvent.modify(event.tableIdentifier(), event.getColumn()));
}
}

@Override
public void applySchemaChange(
Connection connection, TablePath tablePath, AlterTableModifyColumnEvent event)
throws SQLException {
List<String> ddlSQL = new ArrayList<>();
ddlSQL.add(buildUpdateColumnSQL(connection, tablePath, event));

if (event.getColumn().getComment() != null) {
ddlSQL.add(buildUpdateColumnCommentSQL(tablePath, event.getColumn()));
}

try (Statement statement = connection.createStatement()) {
for (String sql : ddlSQL) {
log.info("Executing modify column SQL: " + sql);
statement.execute(sql);
}
}
}

private String buildUpdateColumnSQL(
Connection connection, TablePath tablePath, AlterTableColumnEvent event)
throws SQLException {
String actionType;
Column column;
if (event instanceof AlterTableModifyColumnEvent) {
actionType = "MODIFY";
column = ((AlterTableModifyColumnEvent) event).getColumn();
} else if (event instanceof AlterTableAddColumnEvent) {
actionType = "ADD";
column = ((AlterTableAddColumnEvent) event).getColumn();
} else {
throw new IllegalArgumentException("Unsupported AlterTableColumnEvent: " + event);
}

boolean someCatalog = event.getSourceDialectName().equals(dialectName());
BasicTypeDefine typeDefine = getTypeConverter().reconvert(column);
String columnType = someCatalog ? column.getSourceType() : typeDefine.getColumnType();
StringBuilder sqlBuilder =
new StringBuilder()
.append("ALTER TABLE ")
.append(tableIdentifier(tablePath))
.append(" ")
.append(actionType)
.append(" ")
.append(quoteIdentifier(column.getName()))
.append(" ")
.append(columnType);
// Only decorate with default value when source dialect is same as sink dialect
// Todo Support for cross-database default values for ddl statements
if (column.getDefaultValue() != null && someCatalog) {
sqlBuilder.append(" ").append(sqlClauseWithDefaultValue(typeDefine));
}
if (event instanceof AlterTableModifyColumnEvent) {
boolean targetColumnNullable =
columnIsNullable(connection, tablePath, column.getName());
if (column.isNullable() != targetColumnNullable) {
sqlBuilder.append(" ").append(column.isNullable() ? "NULL" : "NOT NULL");
}
} else {
sqlBuilder.append(" ").append(column.isNullable() ? "NULL" : "NOT NULL");
}
return sqlBuilder.toString();
}

private String buildUpdateColumnCommentSQL(TablePath tablePath, Column column) {
return String.format(
"COMMENT ON COLUMN %s.%s IS '%s'",
tableIdentifier(tablePath), quoteIdentifier(column.getName()), column.getComment());
}

private boolean columnIsNullable(Connection connection, TablePath tablePath, String column)
throws SQLException {
String selectColumnSQL =
"SELECT"
+ " NULLABLE FROM"
+ " ALL_TAB_COLUMNS c"
+ " WHERE c.owner = '"
+ tablePath.getSchemaName()
+ "'"
+ " AND c.table_name = '"
+ tablePath.getTableName()
+ "'"
+ " AND c.column_name = '"
+ column
+ "'";
try (Statement statement = connection.createStatement()) {
ResultSet rs = statement.executeQuery(selectColumnSQL);
rs.next();
return rs.getString("NULLABLE").equals("Y");
}
return sql.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,15 @@

package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;

import org.apache.seatunnel.api.event.EventType;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventDispatcher;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
Expand All @@ -50,9 +45,7 @@

import java.io.IOException;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

@Slf4j
public abstract class AbstractJdbcSinkWriter<ResourceT>
Expand All @@ -67,6 +60,8 @@ public abstract class AbstractJdbcSinkWriter<ResourceT>
protected JdbcConnectionProvider connectionProvider;
protected JdbcSinkConfig jdbcSinkConfig;
protected JdbcOutputFormat<SeaTunnelRow, JdbcBatchStatementExecutor<SeaTunnelRow>> outputFormat;
protected TableSchemaChangeEventDispatcher tableSchemaChanger =
new TableSchemaChangeEventDispatcher();

@Override
public void applySchemaChange(SchemaChangeEvent event) throws IOException {
Expand All @@ -88,57 +83,7 @@ public void applySchemaChange(SchemaChangeEvent event) throws IOException {
}

protected void processSchemaChangeEvent(AlterTableColumnEvent event) throws IOException {
List<Column> columns = new ArrayList<>(tableSchema.getColumns());
switch (event.getEventType()) {
case SCHEMA_CHANGE_ADD_COLUMN:
AlterTableAddColumnEvent alterTableAddColumnEvent =
(AlterTableAddColumnEvent) event;
Column addColumn = alterTableAddColumnEvent.getColumn();
String afterColumn = alterTableAddColumnEvent.getAfterColumn();
if (StringUtils.isNotBlank(afterColumn)) {
Optional<Column> columnOptional =
columns.stream()
.filter(column -> afterColumn.equals(column.getName()))
.findFirst();
if (!columnOptional.isPresent()) {
columns.add(addColumn);
break;
}
columnOptional.ifPresent(
column -> {
int index = columns.indexOf(column);
columns.add(index + 1, addColumn);
});
} else {
columns.add(addColumn);
}
break;
case SCHEMA_CHANGE_DROP_COLUMN:
String dropColumn = ((AlterTableDropColumnEvent) event).getColumn();
columns.removeIf(column -> column.getName().equalsIgnoreCase(dropColumn));
break;
case SCHEMA_CHANGE_MODIFY_COLUMN:
Column modifyColumn = ((AlterTableModifyColumnEvent) event).getColumn();
replaceColumnByIndex(
event.getEventType(), columns, modifyColumn.getName(), modifyColumn);
break;
case SCHEMA_CHANGE_CHANGE_COLUMN:
AlterTableChangeColumnEvent alterTableChangeColumnEvent =
(AlterTableChangeColumnEvent) event;
Column changeColumn = alterTableChangeColumnEvent.getColumn();
String oldColumnName = alterTableChangeColumnEvent.getOldColumn();
replaceColumnByIndex(event.getEventType(), columns, oldColumnName, changeColumn);
break;
default:
throw new SeaTunnelException(
"Unsupported schemaChangeEvent for event type: " + event.getEventType());
}
this.tableSchema =
TableSchema.builder()
.columns(columns)
.primaryKey(tableSchema.getPrimaryKey())
.constraintKey(tableSchema.getConstraintKeys())
.build();
this.tableSchema = tableSchemaChanger.reset(tableSchema).apply(event);
reOpenOutputFormat(event);
}

Expand All @@ -148,7 +93,7 @@ protected void reOpenOutputFormat(AlterTableColumnEvent event) throws IOExceptio
dialect.getJdbcConnectionProvider(jdbcSinkConfig.getJdbcConnectionConfig());
try (Connection connection =
refreshTableSchemaConnectionProvider.getOrEstablishConnection()) {
dialect.applySchemaChange(event, connection, sinkTablePath);
dialect.applySchemaChange(connection, sinkTablePath, event);
} catch (Throwable e) {
throw new JdbcConnectorException(
JdbcConnectorErrorCode.REFRESH_PHYSICAL_TABLESCHEMA_BY_SCHEMA_CHANGE_EVENT, e);
Expand All @@ -159,20 +104,4 @@ protected void reOpenOutputFormat(AlterTableColumnEvent event) throws IOExceptio
.build();
this.outputFormat.open();
}

protected void replaceColumnByIndex(
EventType eventType, List<Column> columns, String oldColumnName, Column newColumn) {
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
if (column.getName().equalsIgnoreCase(oldColumnName)) {
// rename ...... to ...... which just has column name
if (eventType.equals(EventType.SCHEMA_CHANGE_CHANGE_COLUMN)
&& newColumn.getDataType() == null) {
columns.set(i, column.rename(newColumn.getName()));
} else {
columns.set(i, newColumn);
}
}
}
}
}
Loading

0 comments on commit cf0a125

Please sign in to comment.