From 84b59baef9e15bcb8cf688f20e4075d506a92fd6 Mon Sep 17 00:00:00 2001 From: Shiwanming <1633138551@qq.com> Date: Sun, 24 Nov 2024 15:12:16 +0800 Subject: [PATCH] fix Implements SchemaEvolution API --- .../starrocks/sink/StarRocksSinkWriter.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java index cdd73ecf5c1..0c62d97eaeb 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java @@ -18,10 +18,13 @@ package org.apache.seatunnel.connectors.seatunnel.starrocks.sink; import org.apache.seatunnel.api.event.EventType; +import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSink; +import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter; import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.schema.SchemaChangeType; import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent; import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent; import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent; @@ -52,11 +55,13 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Optional; @Slf4j -public class StarRocksSinkWriter extends AbstractSinkWriter { +public class StarRocksSinkWriter extends AbstractSinkWriter + implements SupportSchemaEvolutionSinkWriter, SupportSchemaEvolutionSink { private StarRocksISerializer serializer; private StarRocksSinkManager manager; @@ -234,4 +239,13 @@ protected void replaceColumnByIndex( } } } + + @Override + public List supports() { + return Arrays.asList( + SchemaChangeType.ADD_COLUMN, + SchemaChangeType.DROP_COLUMN, + SchemaChangeType.RENAME_COLUMN, + SchemaChangeType.UPDATE_COLUMN); + } }