diff --git a/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java b/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java index c2816baf1e7..f3eb862d87b 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java @@ -19,6 +19,8 @@ public class SinkType { + public static final String ICEBERG = "ICEBERG"; + public static final String HIVE = "HIVE"; public static final String KAFKA = "KAFKA"; public static final String PULSAR = "PULSAR"; public static final String CLS = "CLS"; diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java index a82d574cacf..a22c4249b03 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java @@ -34,7 +34,8 @@ public class DirtyMessageWrapper { private static DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - private String delimiter; + @Builder.Default + private String delimiter = "|"; @Builder.Default @Getter private int retryTimes = 0; diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java index 24c5dddecd0..76708799258 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java @@ -17,7 +17,6 @@ package org.apache.inlong.sort.base.dirty; -import org.apache.inlong.sort.base.dirty.sink.DirtyServerType; import org.apache.inlong.sort.base.util.PatternReplaceUtils; import org.apache.flink.table.types.logical.LogicalType; @@ -65,7 +64,7 @@ public class DirtyData { */ private final DirtyType dirtyType; - private final DirtyServerType serverType; + private final String serverType; /** * Dirty describe message, it is the cause of dirty data */ @@ -88,7 +87,7 @@ public class DirtyData { private final T data; public DirtyData(T data, String identifier, String labels, - String logTag, DirtyType dirtyType, DirtyServerType serverType, String dirtyMessage, + String logTag, DirtyType dirtyType, String serverType, String dirtyMessage, @Nullable LogicalType rowType, long dataTime, String extParams) { this.data = data; this.dirtyType = dirtyType; @@ -131,7 +130,7 @@ public DirtyType getDirtyType() { return dirtyType; } - public DirtyServerType getServerType() { + public String getServerType() { return serverType; } @@ -162,7 +161,7 @@ public static class Builder { private String labels; private String logTag; private DirtyType dirtyType = DirtyType.UNDEFINED; - private DirtyServerType serverType = DirtyServerType.UNDEFINED; + private String serverType; private String dirtyMessage; private LogicalType rowType; private long dataTime; @@ -184,7 +183,7 @@ public Builder setDirtyType(DirtyType dirtyType) { return this; } - public Builder setServerType(DirtyServerType serverType) { + public Builder setServerType(String serverType) { this.serverType = serverType; return this; } diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtyServerType.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtyServerType.java deleted file mode 100644 index 63f993c146a..00000000000 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtyServerType.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sort.base.dirty.sink; - -public enum DirtyServerType { - - UNDEFINED("Undefined"), - TUBE_MQ("TubeMQ"), - ICEBERG("Iceberg") - - ; - - private final String format; - - DirtyServerType(String format) { - this.format = format; - } - - public String format() { - return format; - } -} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java index 94631e7cd39..2749213096d 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java @@ -17,10 +17,10 @@ package org.apache.inlong.sort.tubemq.table; +import org.apache.inlong.common.constant.MQType; import org.apache.inlong.sort.base.dirty.DirtyData; import org.apache.inlong.sort.base.dirty.DirtyOptions; import org.apache.inlong.sort.base.dirty.DirtyType; -import org.apache.inlong.sort.base.dirty.sink.DirtyServerType; import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricsCollector; @@ -144,7 +144,7 @@ public void deserialize(Message message, Collector out) throws IOExcept builder.setData(message.getData()) .setDirtyType(DirtyType.KEY_DESERIALIZE_ERROR) - .setServerType(DirtyServerType.TUBE_MQ) + .setServerType(MQType.TUBEMQ) .setDirtyDataTime(dataTime) .setExtParams(message.getAttribute()) .setLabels(dirtyOptions.getLabels())