Skip to content

Commit

Permalink
move SupportSchemaEvolutionSink to StarRocksSink
Browse files Browse the repository at this point in the history
  • Loading branch information
jw-itq committed Nov 24, 2024
1 parent 84b59ba commit 573ec9c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,25 @@
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSink;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.schema.SchemaChangeType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalog;
import org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalogFactory;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;

public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportSaveMode {
implements SupportSaveMode, SupportSchemaEvolutionSink {

private final TableSchema tableSchema;
private final SinkConfig sinkConfig;
Expand Down Expand Up @@ -92,4 +96,13 @@ public Optional<SaveModeHandler> getSaveModeHandler() {
public Optional<CatalogTable> getWriteCatalogTable() {
return Optional.of(catalogTable);
}

@Override
public List<SchemaChangeType> supports() {
return Arrays.asList(
SchemaChangeType.ADD_COLUMN,
SchemaChangeType.DROP_COLUMN,
SchemaChangeType.RENAME_COLUMN,
SchemaChangeType.UPDATE_COLUMN);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;

import org.apache.seatunnel.api.event.EventType;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSink;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.schema.SchemaChangeType;
import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent;
Expand Down Expand Up @@ -55,13 +53,12 @@
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

@Slf4j
public class StarRocksSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportSchemaEvolutionSinkWriter, SupportSchemaEvolutionSink {
implements SupportSchemaEvolutionSinkWriter {

private StarRocksISerializer serializer;
private StarRocksSinkManager manager;
Expand Down Expand Up @@ -239,13 +236,4 @@ protected void replaceColumnByIndex(
}
}
}

@Override
public List<SchemaChangeType> supports() {
return Arrays.asList(
SchemaChangeType.ADD_COLUMN,
SchemaChangeType.DROP_COLUMN,
SchemaChangeType.RENAME_COLUMN,
SchemaChangeType.UPDATE_COLUMN);
}
}

0 comments on commit 573ec9c

Please sign in to comment.