diff --git a/pom.xml b/pom.xml
index e050018b2b8..7c433655921 100644
--- a/pom.xml
+++ b/pom.xml
@@ -159,6 +159,7 @@
true
3.1.4
+ 15.0.1
@@ -489,6 +490,17 @@
provided
+
+ org.apache.arrow
+ arrow-vector
+ ${arrow.version}
+
+
+ org.apache.arrow
+ arrow-memory-netty
+ ${arrow.version}
+
+
diff --git a/seatunnel-common/pom.xml b/seatunnel-common/pom.xml
index 218ec7dd9d5..a80ca7fbcd0 100644
--- a/seatunnel-common/pom.xml
+++ b/seatunnel-common/pom.xml
@@ -63,6 +63,13 @@
optional
+
+ org.apache.seatunnel
+ seatunnel-arrow
+ ${project.version}
+ optional
+
+
commons-codec
commons-codec
diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/Converter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/Converter.java
new file mode 100644
index 00000000000..4c367e14ae7
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/Converter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter;
+
+import org.apache.seatunnel.shade.org.apache.arrow.vector.FieldVector;
+import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types;
+
+import java.util.Map;
+import java.util.function.Function;
+
+public interface Converter {
+
+ String ARRAY_KEY = "ARRAY";
+ String MAP_KEY = "KEY";
+ String MAP_VALUE = "VALUE";
+
+ Object convert(int rowIndex, T fieldVector);
+
+ default Object convert(int rowIndex, T fieldVector, Map genericsConverters) {
+ throw new UnsupportedOperationException("Unsupported generics convert");
+ }
+
+ boolean support(Types.MinorType type);
+}
diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/DefaultConverter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/DefaultConverter.java
new file mode 100644
index 00000000000..7a9b6d0a47e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/DefaultConverter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter;
+
+import org.apache.seatunnel.shade.org.apache.arrow.vector.FieldVector;
+import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types;
+
+public class DefaultConverter implements Converter {
+
+ @Override
+ public Object convert(int rowIndex, FieldVector fieldVector) {
+ return fieldVector.isNull(rowIndex) ? null : fieldVector.getObject(rowIndex);
+ }
+
+ @Override
+ public boolean support(Types.MinorType type) {
+ return false;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/FixedSizeListConverter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/FixedSizeListConverter.java
new file mode 100644
index 00000000000..c325cc50cde
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/FixedSizeListConverter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter;
+
+import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.FixedSizeListVector;
+import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class FixedSizeListConverter implements Converter {
+ @Override
+ public Object convert(int rowIndex, FixedSizeListVector fieldVector) {
+ return fieldVector.isNull(rowIndex) ? null : fieldVector.getObject(rowIndex);
+ }
+
+ @Override
+ public Object convert(
+ int rowIndex,
+ FixedSizeListVector fieldVector,
+ Map genericsConverters) {
+ if (fieldVector.isNull(rowIndex)) {
+ return null;
+ }
+ List> listData = fieldVector.getObject(rowIndex);
+ Function converter = genericsConverters.get(ARRAY_KEY);
+ return listData.stream()
+ .map(
+ item -> {
+ if (item instanceof LocalDateTime) {
+ LocalDateTime localDateTime =
+ ((LocalDateTime) item)
+ .atZone(ZoneOffset.UTC)
+ .withZoneSameInstant(ZoneId.systemDefault())
+ .toLocalDateTime();
+ return converter.apply(localDateTime);
+ } else {
+ return converter.apply(item);
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean support(Types.MinorType type) {
+ return Types.MinorType.FIXED_SIZE_LIST == type;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/LargeListConverter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/LargeListConverter.java
new file mode 100644
index 00000000000..ed0d6ee26a9
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/LargeListConverter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter;
+
+import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.LargeListVector;
+import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class LargeListConverter implements Converter {
+ @Override
+ public Object convert(int rowIndex, LargeListVector fieldVector) {
+ return fieldVector.isNull(rowIndex) ? null : fieldVector.getObject(rowIndex);
+ }
+
+ @Override
+ public Object convert(
+ int rowIndex, LargeListVector fieldVector, Map genericsConverters) {
+ if (fieldVector.isNull(rowIndex)) {
+ return null;
+ }
+ if (fieldVector.isEmpty(rowIndex)) {
+ return Collections.emptyList();
+ }
+ List> listData = fieldVector.getObject(rowIndex);
+ Function converter = genericsConverters.get(ARRAY_KEY);
+ return listData.stream()
+ .map(
+ item -> {
+ if (item instanceof LocalDateTime) {
+ LocalDateTime localDateTime =
+ ((LocalDateTime) item)
+ .atZone(ZoneOffset.UTC)
+ .withZoneSameInstant(ZoneId.systemDefault())
+ .toLocalDateTime();
+ return converter.apply(localDateTime);
+ } else {
+ return converter.apply(item);
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean support(Types.MinorType type) {
+ return Types.MinorType.LARGELIST == type;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/ListConverter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/ListConverter.java
new file mode 100644
index 00000000000..3424f56018a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/ListConverter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter;
+
+import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.ListVector;
+import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class ListConverter implements Converter {
+ @Override
+ public Object convert(int rowIndex, ListVector fieldVector) {
+ return fieldVector.isNull(rowIndex) ? null : fieldVector.getObject(rowIndex);
+ }
+
+ @Override
+ public Object convert(
+ int rowIndex, ListVector fieldVector, Map genericsConverters) {
+ if (fieldVector.isNull(rowIndex)) {
+ return null;
+ }
+ if (fieldVector.isEmpty(rowIndex)) {
+ return Collections.emptyList();
+ }
+ List> listData = fieldVector.getObject(rowIndex);
+ Function converter = genericsConverters.get(ARRAY_KEY);
+ return listData.stream()
+ .map(
+ item -> {
+ if (item instanceof LocalDateTime) {
+ LocalDateTime localDateTime =
+ ((LocalDateTime) item)
+ .atZone(ZoneOffset.UTC)
+ .withZoneSameInstant(ZoneId.systemDefault())
+ .toLocalDateTime();
+ return converter.apply(localDateTime);
+ } else {
+ return converter.apply(item);
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean support(Types.MinorType type) {
+ return Types.MinorType.LIST == type;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/MapConverter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/MapConverter.java
new file mode 100644
index 00000000000..18ec3c6910e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/MapConverter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter;
+
+import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.MapVector;
+import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.impl.UnionMapReader;
+import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+public class MapConverter implements Converter {
+ @Override
+ public Object convert(int rowIndex, MapVector fieldVector) {
+ return fieldVector.isNull(rowIndex) ? null : fieldVector.getObject(rowIndex);
+ }
+
+ @Override
+ public Object convert(
+ int rowIndex, MapVector fieldVector, Map genericsConverters) {
+ UnionMapReader reader = fieldVector.getReader();
+ reader.setPosition(rowIndex);
+ Map
-
- org.apache.seatunnel
- seatunnel-arrow-5.0
- ${project.version}
- optional
-
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java
index 68d2eecfb51..e68366cf260 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java
@@ -27,8 +27,8 @@
import org.apache.seatunnel.connectors.doris.rest.models.Schema;
import org.apache.seatunnel.connectors.doris.source.DorisSourceTable;
import org.apache.seatunnel.connectors.doris.source.serialization.Routing;
-import org.apache.seatunnel.connectors.doris.source.serialization.RowBatch;
import org.apache.seatunnel.connectors.doris.util.SchemaUtils;
+import org.apache.seatunnel.connectors.seatunnel.common.source.arrow.reader.ArrowToSeatunnelRowReader;
import org.apache.doris.sdk.thrift.TScanBatchResult;
import org.apache.doris.sdk.thrift.TScanCloseParams;
@@ -60,12 +60,12 @@ public class DorisValueReader {
protected int offset = 0;
protected AtomicBoolean eos = new AtomicBoolean(false);
- protected RowBatch rowBatch;
+ protected ArrowToSeatunnelRowReader rowBatch;
// flag indicate if support deserialize Arrow to RowBatch asynchronously
protected boolean deserializeArrowToRowBatchAsync;
- protected BlockingQueue rowBatchBlockingQueue;
+ protected BlockingQueue rowBatchBlockingQueue;
private TScanOpenParams openParams;
protected String contextId;
protected Schema schema;
@@ -115,12 +115,12 @@ private BackendClient backendClient() {
private TScanOpenParams openParams() {
TScanOpenParams params = new TScanOpenParams();
- params.cluster = DORIS_DEFAULT_CLUSTER;
- params.database = partition.getDatabase();
- params.table = partition.getTable();
+ params.setCluster(DORIS_DEFAULT_CLUSTER);
+ params.setDatabase(partition.getDatabase());
+ params.setTable(partition.getTable());
- params.tablet_ids = Arrays.asList(partition.getTabletIds().toArray(new Long[] {}));
- params.opaqued_query_plan = partition.getQueryPlan();
+ params.setTabletIds(Arrays.asList(partition.getTabletIds().toArray(new Long[] {})));
+ params.setOpaquedQueryPlan(partition.getQueryPlan());
// max row number of one read batch
Integer batchSize = dorisSourceTable.getBatchSize();
Integer queryDorisTimeout = config.getRequestQueryTimeoutS();
@@ -158,8 +158,10 @@ public void run() {
TScanBatchResult nextResult = client.getNext(nextBatchParams);
eos.set(nextResult.isEos());
if (!eos.get()) {
- RowBatch rowBatch =
- new RowBatch(nextResult, seaTunnelRowType)
+ ArrowToSeatunnelRowReader rowBatch =
+ new ArrowToSeatunnelRowReader(
+ nextResult.getRows(),
+ seaTunnelRowType)
.readArrow();
offset += rowBatch.getReadRowCount();
rowBatch.close();
@@ -233,7 +235,10 @@ public boolean hasNext() {
TScanBatchResult nextResult = client.getNext(nextBatchParams);
eos.set(nextResult.isEos());
if (!eos.get()) {
- rowBatch = new RowBatch(nextResult, seaTunnelRowType).readArrow();
+ rowBatch =
+ new ArrowToSeatunnelRowReader(
+ nextResult.getRows(), seaTunnelRowType)
+ .readArrow();
}
}
hasNext = !eos.get();
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java
deleted file mode 100644
index a1f1c678aa4..00000000000
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java
+++ /dev/null
@@ -1,742 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.source.serialization;
-
-import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
-import org.apache.seatunnel.shade.org.apache.arrow.memory.RootAllocator;
-import org.apache.seatunnel.shade.org.apache.arrow.vector.BigIntVector;
-import org.apache.seatunnel.shade.org.apache.arrow.vector.BitVector;
-import org.apache.seatunnel.shade.org.apache.arrow.vector.DateDayVector;
-import org.apache.seatunnel.shade.org.apache.arrow.vector.DecimalVector;
-import org.apache.seatunnel.shade.org.apache.arrow.vector.FieldVector;
-import org.apache.seatunnel.shade.org.apache.arrow.vector.FixedSizeBinaryVector;
-import org.apache.seatunnel.shade.org.apache.arrow.vector.Float4Vector;
-import org.apache.seatunnel.shade.org.apache.arrow.vector.Float8Vector;
-import org.apache.seatunnel.shade.org.apache.arrow.vector.IntVector;
-import org.apache.seatunnel.shade.org.apache.arrow.vector.SmallIntVector;
-import org.apache.seatunnel.shade.org.apache.arrow.vector.TimeStampMicroVector;
-import org.apache.seatunnel.shade.org.apache.arrow.vector.TinyIntVector;
-import org.apache.seatunnel.shade.org.apache.arrow.vector.VarCharVector;
-import org.apache.seatunnel.shade.org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.ListVector;
-import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.MapVector;
-import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.StructVector;
-import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.impl.UnionMapReader;
-import org.apache.seatunnel.shade.org.apache.arrow.vector.ipc.ArrowStreamReader;
-import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types;
-import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types.MinorType;
-import org.apache.seatunnel.shade.org.apache.arrow.vector.util.Text;
-
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.DecimalArrayType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.table.type.SqlType;
-import org.apache.seatunnel.common.utils.DateTimeUtils;
-import org.apache.seatunnel.common.utils.DateUtils;
-import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
-import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
-
-import org.apache.doris.sdk.thrift.TScanBatchResult;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.function.IntFunction;
-
-@Slf4j
-public class RowBatch {
- SeaTunnelDataType>[] fieldTypes;
- private final ArrowStreamReader arrowStreamReader;
- private final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
- private final String DATETIMEV2_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS";
- private final DateTimeFormatter dateTimeV2Formatter =
- DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
- private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
- // offset for iterate the rowBatch
- private int offsetInRowBatch = 0;
- private int rowCountInOneBatch = 0;
- private int readRowCount = 0;
- private List seatunnelRowBatch = new ArrayList<>();
- private VectorSchemaRoot root;
- private List fieldVectors;
- private RootAllocator rootAllocator;
-
- public RowBatch(TScanBatchResult nextResult, SeaTunnelRowType seaTunnelRowType) {
- this.rootAllocator = new RootAllocator(Integer.MAX_VALUE);
- this.arrowStreamReader =
- new ArrowStreamReader(
- new ByteArrayInputStream(nextResult.getRows()), rootAllocator);
- this.offsetInRowBatch = 0;
- this.fieldTypes = seaTunnelRowType.getFieldTypes();
- }
-
- public RowBatch readArrow() {
- try {
- this.root = arrowStreamReader.getVectorSchemaRoot();
- while (arrowStreamReader.loadNextBatch()) {
- fieldVectors = root.getFieldVectors();
- // Adapt unique model hidden columns
- for (int i = 0; i < fieldVectors.size(); i++) {
- String fieldName = fieldVectors.get(i).getField().getName();
- if (fieldName.equals("__DORIS_DELETE_SIGN__")) {
- fieldVectors.remove(fieldVectors.get(i));
- }
- }
- if (fieldVectors.size() != fieldTypes.length) {
- log.error(
- "Schema size '{}' is not equal to arrow field size '{}'.",
- fieldVectors.size(),
- fieldTypes.length);
- throw new DorisConnectorException(
- DorisConnectorErrorCode.ARROW_READ_FAILED,
- "Load Doris data failed, schema size of fetch data is wrong.");
- }
- if (fieldVectors.size() == 0 || root.getRowCount() == 0) {
- log.debug("One batch in arrow has no data.");
- continue;
- }
- rowCountInOneBatch = root.getRowCount();
- // init the rowBatch
- for (int i = 0; i < rowCountInOneBatch; ++i) {
- seatunnelRowBatch.add((new SeaTunnelRow(fieldVectors.size())));
- }
- convertArrowToRowBatch();
- readRowCount += root.getRowCount();
- }
- return this;
- } catch (Throwable e) {
- log.error("Read Doris Data failed because: ", e);
- throw new DorisConnectorException(
- DorisConnectorErrorCode.ARROW_READ_FAILED, e.getMessage());
- } finally {
- close();
- }
- }
-
- public boolean hasNext() {
- return offsetInRowBatch < readRowCount;
- }
-
- private void addValueToRow(int rowIndex, int colIndex, Object obj) {
- if (rowIndex > rowCountInOneBatch) {
- String errMsg =
- "Get row offset: " + rowIndex + " larger than row size: " + rowCountInOneBatch;
- log.error(errMsg);
- throw new NoSuchElementException(errMsg);
- }
- seatunnelRowBatch.get(readRowCount + rowIndex).setField(colIndex, obj);
- }
-
- public void convertArrowToRowBatch() throws DorisConnectorException {
- try {
- for (int col = 0; col < fieldVectors.size(); col++) {
- SeaTunnelDataType> dataType = fieldTypes[col];
- final String currentType = dataType.getSqlType().name();
-
- FieldVector fieldVector = fieldVectors.get(col);
- Types.MinorType minorType = fieldVector.getMinorType();
- convertArrowValue(col, currentType, dataType, minorType, fieldVector);
- }
- } catch (Throwable e) {
- close();
- throw e;
- }
- }
-
- private void convertArrowValue(
- int col,
- String currentType,
- SeaTunnelDataType> dataType,
- MinorType minorType,
- FieldVector fieldVector) {
- switch (currentType) {
- case "BOOLEAN":
- BitVector bitVector = (BitVector) fieldVector;
- Preconditions.checkArgument(
- minorType.equals(Types.MinorType.BIT),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForAllRows(
- col,
- rowIndex ->
- bitVector.isNull(rowIndex) ? null : bitVector.get(rowIndex) != 0);
- break;
- case "TINYINT":
- TinyIntVector tinyIntVector = (TinyIntVector) fieldVector;
- Preconditions.checkArgument(
- minorType.equals(Types.MinorType.TINYINT),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForAllRows(
- col,
- rowIndex ->
- tinyIntVector.isNull(rowIndex)
- ? null
- : tinyIntVector.get(rowIndex));
- break;
- case "SMALLINT":
- if (fieldVector instanceof BitVector) {
- BitVector bv = (BitVector) fieldVector;
- Preconditions.checkArgument(
- minorType.equals(Types.MinorType.BIT),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForAllRows(
- col, rowIndex -> bv.isNull(rowIndex) ? null : (short) bv.get(rowIndex));
-
- } else if (fieldVector instanceof TinyIntVector) {
- TinyIntVector tv = (TinyIntVector) fieldVector;
- Preconditions.checkArgument(
- minorType.equals(MinorType.TINYINT),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForAllRows(
- col, rowIndex -> tv.isNull(rowIndex) ? null : (short) tv.get(rowIndex));
-
- } else {
- SmallIntVector smallIntVector = (SmallIntVector) fieldVector;
- Preconditions.checkArgument(
- minorType.equals(Types.MinorType.SMALLINT),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForAllRows(
- col,
- rowIndex ->
- smallIntVector.isNull(rowIndex)
- ? null
- : smallIntVector.get(rowIndex));
- }
- break;
- case "INT":
- IntVector intVector = (IntVector) fieldVector;
- Preconditions.checkArgument(
- minorType.equals(Types.MinorType.INT),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForAllRows(
- col,
- rowIndex -> intVector.isNull(rowIndex) ? null : intVector.get(rowIndex));
- break;
- case "BIGINT":
- BigIntVector bigIntVector = (BigIntVector) fieldVector;
- Preconditions.checkArgument(
- minorType.equals(Types.MinorType.BIGINT),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForAllRows(
- col,
- rowIndex ->
- bigIntVector.isNull(rowIndex) ? null : bigIntVector.get(rowIndex));
- break;
- case "FLOAT":
- Float4Vector float4Vector = (Float4Vector) fieldVector;
- Preconditions.checkArgument(
- minorType.equals(Types.MinorType.FLOAT4),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForAllRows(
- col,
- rowIndex ->
- float4Vector.isNull(rowIndex) ? null : float4Vector.get(rowIndex));
- break;
- case "DOUBLE":
- Float8Vector float8Vector = (Float8Vector) fieldVector;
- Preconditions.checkArgument(
- minorType.equals(Types.MinorType.FLOAT8),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForAllRows(
- col,
- rowIndex ->
- float8Vector.isNull(rowIndex) ? null : float8Vector.get(rowIndex));
- break;
- case "DECIMAL":
- // LARGEINT
- if (fieldVector instanceof FixedSizeBinaryVector) {
- FixedSizeBinaryVector largeInitFixedSizeBinaryVector =
- (FixedSizeBinaryVector) fieldVector;
- Preconditions.checkArgument(
- minorType.equals(Types.MinorType.FIXEDSIZEBINARY),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForAllRows(
- col,
- rowIndex -> {
- if (largeInitFixedSizeBinaryVector.isNull(rowIndex)) {
- return null;
- }
- byte[] bytes = largeInitFixedSizeBinaryVector.get(rowIndex);
- int left = 0, right = bytes.length - 1;
- while (left < right) {
- byte temp = bytes[left];
- bytes[left] = bytes[right];
- bytes[right] = temp;
- left++;
- right--;
- }
- return new BigDecimal(new BigInteger(bytes), 0);
- });
- break;
- } else if (fieldVector instanceof VarCharVector) {
- VarCharVector varCharVector = (VarCharVector) fieldVector;
- Preconditions.checkArgument(
- minorType.equals(Types.MinorType.VARCHAR),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForAllRows(
- col,
- rowIndex ->
- varCharVector.isNull(rowIndex)
- ? null
- : new BigDecimal(
- new String(varCharVector.get(rowIndex))));
- break;
- }
- DecimalVector decimalVector = (DecimalVector) fieldVector;
- Preconditions.checkArgument(
- minorType.equals(Types.MinorType.DECIMAL),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForAllRows(
- col,
- rowIndex ->
- decimalVector.isNull(rowIndex)
- ? null
- : decimalVector.getObject(rowIndex).stripTrailingZeros());
- break;
- case "DATE":
- case "DATEV2":
- if (fieldVector instanceof DateDayVector) {
- DateDayVector dateVector = (DateDayVector) fieldVector;
- Preconditions.checkArgument(
- minorType.equals(Types.MinorType.DATEDAY),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForAllRows(
- col,
- rowIndex -> {
- if (dateVector.isNull(rowIndex)) {
- return null;
- }
- return LocalDate.ofEpochDay(dateVector.get(rowIndex));
- });
- break;
- }
- VarCharVector dateVector = (VarCharVector) fieldVector;
- Preconditions.checkArgument(
- minorType.equals(Types.MinorType.VARCHAR),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForAllRows(
- col,
- rowIndex -> {
- if (dateVector.isNull(rowIndex)) {
- return null;
- }
- String stringValue = new String(dateVector.get(rowIndex));
- return LocalDate.parse(stringValue, dateFormatter);
- });
- break;
- case "TIMESTAMP":
- if (fieldVector instanceof TimeStampMicroVector) {
- TimeStampMicroVector timestampVector = (TimeStampMicroVector) fieldVector;
-
- addValueToRowForAllRows(
- col,
- rowIndex -> {
- if (timestampVector.isNull(rowIndex)) {
- return null;
- }
- String stringValue = timestampVector.getObject(rowIndex).toString();
- stringValue = completeMilliseconds(stringValue);
-
- return DateTimeUtils.parse(stringValue);
- });
- break;
- }
- VarCharVector timestampVector = (VarCharVector) fieldVector;
- Preconditions.checkArgument(
- minorType.equals(Types.MinorType.VARCHAR),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForAllRows(
- col,
- rowIndex -> {
- if (timestampVector.isNull(rowIndex)) {
- return null;
- }
- String stringValue = new String(timestampVector.get(rowIndex));
- stringValue = completeMilliseconds(stringValue);
- return LocalDateTime.parse(stringValue, dateTimeV2Formatter);
- });
- break;
- case "STRING":
- if (fieldVector instanceof FixedSizeBinaryVector) {
- FixedSizeBinaryVector fixedSizeBinaryVector =
- (FixedSizeBinaryVector) fieldVector;
- Preconditions.checkArgument(
- minorType.equals(Types.MinorType.FIXEDSIZEBINARY),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForAllRows(
- col,
- rowIndex -> {
- if (fixedSizeBinaryVector.isNull(rowIndex)) {
- return null;
- }
- byte[] bytes = fixedSizeBinaryVector.get(rowIndex);
- int left = 0, right = bytes.length - 1;
- while (left < right) {
- byte temp = bytes[left];
- bytes[left] = bytes[right];
- bytes[right] = temp;
- left++;
- right--;
- }
- return new BigInteger(bytes).toString();
- });
- break;
- } else if (fieldVector instanceof MapVector) {
- MapVector mapVector = (MapVector) fieldVector;
- UnionMapReader reader = mapVector.getReader();
- Preconditions.checkArgument(
- minorType.equals(MinorType.MAP),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForAllRows(
- col,
- rowIndex -> {
- if (mapVector.isNull(rowIndex)) {
- return null;
- }
- reader.setPosition(rowIndex);
- Map mapValue = new HashMap<>();
- while (reader.next()) {
- mapValue.put(
- reader.key().readObject().toString(),
- reader.value().readObject());
- }
- return mapValue.toString();
- });
- } else if (fieldVector instanceof StructVector) {
- StructVector structVector = (StructVector) fieldVector;
- Preconditions.checkArgument(
- minorType.equals(MinorType.STRUCT),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForAllRows(
- col,
- rowIndex -> {
- if (structVector.isNull(rowIndex)) {
- return null;
- }
- Map structValue = structVector.getObject(rowIndex);
- return structValue.toString();
- });
- } else if (fieldVector instanceof ListVector) {
- ListVector listVector = (ListVector) fieldVector;
- Preconditions.checkArgument(
- minorType.equals(Types.MinorType.LIST),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForAllRows(
- col,
- rowIndex -> {
- if (listVector.isNull(rowIndex)) {
- return null;
- }
- List> listVectorObject = listVector.getObject(rowIndex);
- return Arrays.toString(listVectorObject.toArray());
- });
- } else {
- VarCharVector varCharVector = (VarCharVector) fieldVector;
- Preconditions.checkArgument(
- minorType.equals(Types.MinorType.VARCHAR),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForAllRows(
- col,
- rowIndex ->
- varCharVector.isNull(rowIndex)
- ? null
- : new String(varCharVector.get(rowIndex)));
- }
- break;
- case "ARRAY":
- ListVector listVector = (ListVector) fieldVector;
- Preconditions.checkArgument(
- minorType.equals(Types.MinorType.LIST),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForArrayColumn(dataType, col, listVector);
- break;
- case "MAP":
- MapVector mapVector = (MapVector) fieldVector;
- UnionMapReader reader = mapVector.getReader();
- Preconditions.checkArgument(
- minorType.equals(MinorType.MAP),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForMapColumn(dataType, col, mapVector, reader);
-
- break;
- case "STRUCT":
- StructVector structVector = (StructVector) fieldVector;
- Preconditions.checkArgument(
- minorType.equals(MinorType.STRUCT),
- typeMismatchMessage(currentType, minorType));
- addValueToRowForAllRows(
- col,
- rowIndex -> {
- if (structVector.isNull(rowIndex)) {
- return null;
- }
- Map structValue = structVector.getObject(rowIndex);
- return structValue;
- });
- break;
- default:
- String errMsg = "Unsupported type " + fieldTypes[col].getSqlType().name();
- log.error(errMsg);
- throw new DorisConnectorException(
- DorisConnectorErrorCode.ARROW_READ_FAILED, errMsg);
- }
- }
-
- private void addValueToRowForMapColumn(
- SeaTunnelDataType> dataType, int col, MapVector mapVector, UnionMapReader reader) {
- addValueToRowForAllRows(
- col,
- rowIndex -> {
- if (mapVector.isNull(rowIndex)) {
- return null;
- }
- reader.setPosition(rowIndex);
- Map mapValue = new HashMap<>();
- MapType mapType = (MapType) dataType;
- SqlType keyType = mapType.getKeyType().getSqlType();
- SqlType valueType = mapType.getValueType().getSqlType();
- while (reader.next()) {
- mapValue.put(
- getDataFromVector(reader.key().readObject(), keyType),
- getDataFromVector(reader.value().readObject(), valueType));
- }
- return mapValue;
- });
- }
-
- private Object getDataFromVector(Object vectorObject, SqlType sqlType) {
- if (vectorObject instanceof Boolean) {
- return Boolean.valueOf(vectorObject.toString());
- }
-
- if (vectorObject instanceof Byte) {
- return Byte.valueOf(vectorObject.toString());
- }
-
- if (vectorObject instanceof Short) {
- return Short.valueOf(vectorObject.toString());
- }
-
- if (vectorObject instanceof Integer) {
- if (sqlType.equals(SqlType.DATE)) {
- return LocalDate.ofEpochDay((int) vectorObject);
- }
- return Integer.valueOf(vectorObject.toString());
- }
-
- if (vectorObject instanceof Long) {
- return Long.valueOf(vectorObject.toString());
- }
-
- if (vectorObject instanceof Float) {
- return Float.valueOf(vectorObject.toString());
- }
-
- if (vectorObject instanceof Double) {
- return Double.valueOf(vectorObject.toString());
- }
-
- if (vectorObject instanceof Text) {
- if (sqlType.equals(SqlType.TIMESTAMP)) {
- String stringValue = completeMilliseconds(vectorObject.toString());
- return LocalDateTime.parse(stringValue, dateTimeV2Formatter);
- } else if (sqlType.equals(SqlType.DATE)) {
- return LocalDate.parse(vectorObject.toString(), dateFormatter);
- } else if (sqlType.equals(SqlType.DECIMAL)) {
- return new BigDecimal(vectorObject.toString());
- }
- return vectorObject.toString();
- }
-
- if (vectorObject instanceof BigDecimal) {
- return new BigDecimal(vectorObject.toString());
- }
-
- if (vectorObject instanceof byte[] && sqlType.equals(SqlType.DECIMAL)) {
- byte[] bytes = (byte[]) vectorObject;
- int left = 0, right = bytes.length - 1;
- while (left < right) {
- byte temp = bytes[left];
- bytes[left] = bytes[right];
- bytes[right] = temp;
- left++;
- right--;
- }
- return new BigDecimal(new BigInteger(bytes), 0);
- }
- if (vectorObject instanceof LocalDate) {
- return DateUtils.parse(vectorObject.toString());
- }
-
- if (vectorObject instanceof LocalDateTime) {
- return DateTimeUtils.parse(vectorObject.toString());
- }
-
- return vectorObject.toString();
- }
-
- private void addValueToRowForArrayColumn(
- SeaTunnelDataType> dataType, int col, ListVector listVector) {
- SqlType eleSqlType = null;
- if (dataType instanceof ArrayType) {
- ArrayType arrayType = (ArrayType) dataType;
- eleSqlType = arrayType.getElementType().getSqlType();
- }
- SqlType finalEleSqlType = eleSqlType;
- addValueToRowForAllRows(
- col,
- rowIndex -> {
- if (listVector.isNull(rowIndex)) {
- return null;
- }
- List> listVectorObject = listVector.getObject(rowIndex);
- if (listVectorObject.get(0) instanceof Boolean) {
- return listVectorObject.stream()
- .map(x -> getDataFromVector(x, finalEleSqlType))
- .toArray(Boolean[]::new);
- }
-
- if (listVectorObject.get(0) instanceof Byte) {
- return listVectorObject.stream()
- .map(x -> getDataFromVector(x, finalEleSqlType))
- .toArray(Byte[]::new);
- }
-
- if (listVectorObject.get(0) instanceof Short) {
- return listVectorObject.stream()
- .map(x -> getDataFromVector(x, finalEleSqlType))
- .toArray(Short[]::new);
- }
-
- if (listVectorObject.get(0) instanceof Integer) {
- return listVectorObject.stream()
- .map(x -> getDataFromVector(x, finalEleSqlType))
- .toArray(Integer[]::new);
- }
-
- if (listVectorObject.get(0) instanceof Long) {
- return listVectorObject.stream()
- .map(x -> getDataFromVector(x, finalEleSqlType))
- .toArray(Long[]::new);
- }
-
- if (listVectorObject.get(0) instanceof Float) {
- return listVectorObject.stream()
- .map(x -> getDataFromVector(x, finalEleSqlType))
- .toArray(Float[]::new);
- }
-
- if (listVectorObject.get(0) instanceof Double) {
- return listVectorObject.stream()
- .map(x -> getDataFromVector(x, finalEleSqlType))
- .toArray(Double[]::new);
- }
-
- if (listVectorObject.get(0) instanceof Text) {
- return listVectorObject.stream()
- .map(x -> getDataFromVector(x, finalEleSqlType))
- .toArray(String[]::new);
- }
-
- if (listVectorObject.get(0) instanceof BigDecimal) {
- return listVectorObject.stream()
- .map(x -> getDataFromVector(x, finalEleSqlType))
- .toArray(BigDecimal[]::new);
- }
-
- if (listVectorObject.get(0) instanceof byte[]
- && dataType instanceof DecimalArrayType) {
- return listVectorObject.stream()
- .map(x -> getDataFromVector(x, finalEleSqlType))
- .toArray(BigDecimal[]::new);
- }
-
- return listVectorObject.toArray();
- });
- }
-
- private void addValueToRowForAllRows(int col, IntFunction function) {
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
- Object fieldValue = function.apply(rowIndex);
- addValueToRow(rowIndex, col, fieldValue);
- }
- }
-
- private String completeMilliseconds(String stringValue) {
- if (stringValue.length() == DATETIMEV2_PATTERN.length()) {
- return stringValue;
- }
- StringBuilder sb = new StringBuilder(stringValue);
- if (stringValue.length() == DATETIME_PATTERN.length()) {
- sb.append(".");
- }
- while (sb.toString().length() < DATETIMEV2_PATTERN.length()) {
- sb.append(0);
- }
- return sb.toString();
- }
-
- public SeaTunnelRow next() {
- if (!hasNext()) {
- String errMsg =
- "Get row offset:" + offsetInRowBatch + " larger than row size: " + readRowCount;
- log.error(errMsg);
- throw new NoSuchElementException(errMsg);
- }
- return seatunnelRowBatch.get(offsetInRowBatch++);
- }
-
- private String typeMismatchMessage(final String flinkType, final Types.MinorType arrowType) {
- final String messageTemplate = "FLINK type is %1$s, but arrow type is %2$s.";
- return String.format(messageTemplate, flinkType, arrowType.name());
- }
-
- public int getReadRowCount() {
- return readRowCount;
- }
-
- public void close() {
- try {
- if (arrowStreamReader != null) {
- arrowStreamReader.close();
- }
- if (rootAllocator != null) {
- rootAllocator.close();
- }
- } catch (IOException ioe) {
- throw new DorisConnectorException(
- DorisConnectorErrorCode.ROW_BATCH_GET_FAILED,
- "Failed to close ArrowStreamReader",
- ioe);
- }
- }
-}
diff --git a/seatunnel-connectors-v2/connector-starrocks/pom.xml b/seatunnel-connectors-v2/connector-starrocks/pom.xml
index 08e49bc0f02..6653610de4e 100644
--- a/seatunnel-connectors-v2/connector-starrocks/pom.xml
+++ b/seatunnel-connectors-v2/connector-starrocks/pom.xml
@@ -55,11 +55,6 @@
${mysql.version}
provided
-
- org.apache.seatunnel
- seatunnel-common
- ${project.version}
-
org.apache.httpcomponents
httpclient
@@ -75,52 +70,6 @@
starrocks-thrift-sdk
${starrocks.thrift.sdk.version}
-
- org.apache.arrow
- arrow-vector
- ${arrow.version}
-
-
- org.apache.arrow
- arrow-memory-netty
- ${arrow.version}
-
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
-
-
-
- org.apache.arrow
- ${seatunnel.shade.package}.${connector.name}.org.apache.arrow
-
-
- io.netty
- ${seatunnel.shade.package}.${connector.name}.io.netty
-
-
- com.google.flatbuffers
- ${seatunnel.shade.package}.${connector.name}.com.google.flatbuffers
-
-
- com.fasterxml.jackson
- ${seatunnel.shade.package}.${connector.name}.com.fasterxml.jackson
-
-
-
-
-
-
- shade
-
- package
-
-
-
-
-
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java
index 07a5a03eba7..c0be0106bb0 100644
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java
+++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java
@@ -19,6 +19,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.source.arrow.reader.ArrowToSeatunnelRowReader;
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.model.QueryPartition;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
@@ -56,7 +57,7 @@ public class StarRocksBeReadClient implements Serializable {
private int readerOffset = 0;
private final SourceConfig sourceConfig;
private SeaTunnelRowType seaTunnelRowType;
- private StarRocksRowBatchReader rowBatch;
+ private ArrowToSeatunnelRowReader rowBatch;
protected AtomicBoolean eos = new AtomicBoolean(false);
public StarRocksBeReadClient(String beNodeInfo, SourceConfig sourceConfig) {
@@ -162,7 +163,10 @@ public boolean hasNext() {
}
eos.set(result.isEos());
if (!eos.get()) {
- rowBatch = new StarRocksRowBatchReader(result, seaTunnelRowType).readArrow();
+
+ rowBatch =
+ new ArrowToSeatunnelRowReader(result.getRows(), seaTunnelRowType)
+ .readArrow();
}
} catch (TException e) {
throw new StarRocksConnectorException(
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksRowBatchReader.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksRowBatchReader.java
deleted file mode 100644
index 2ea7e98c7e5..00000000000
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksRowBatchReader.java
+++ /dev/null
@@ -1,357 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.seatunnel.connectors.seatunnel.starrocks.client.source;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
-import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
-
-import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.BigIntVector;
-import org.apache.arrow.vector.BitVector;
-import org.apache.arrow.vector.DecimalVector;
-import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.Float4Vector;
-import org.apache.arrow.vector.Float8Vector;
-import org.apache.arrow.vector.IntVector;
-import org.apache.arrow.vector.SmallIntVector;
-import org.apache.arrow.vector.TinyIntVector;
-import org.apache.arrow.vector.VarCharVector;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.ipc.ArrowStreamReader;
-import org.apache.arrow.vector.types.Types;
-
-import com.starrocks.thrift.TScanBatchResult;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
-
-@Slf4j
-public class StarRocksRowBatchReader {
-
- private SeaTunnelDataType>[] seaTunnelDataTypes;
- private int offsetInRowBatch = 0;
- private int rowCountInOneBatch = 0;
- private int readRowCount = 0;
- private List seaTunnelRowBatch = new ArrayList<>();
- private final ArrowStreamReader arrowStreamReader;
- private VectorSchemaRoot root;
- private List fieldVectors;
- private RootAllocator rootAllocator;
- private final DateTimeFormatter dateTimeFormatter =
- DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
-
- public StarRocksRowBatchReader(TScanBatchResult nextResult, SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelDataTypes = seaTunnelRowType.getFieldTypes();
- this.rootAllocator = new RootAllocator(Integer.MAX_VALUE);
- this.arrowStreamReader =
- new ArrowStreamReader(
- new ByteArrayInputStream(nextResult.getRows()), rootAllocator);
- }
-
- public StarRocksRowBatchReader readArrow() {
- try {
- this.root = arrowStreamReader.getVectorSchemaRoot();
- while (arrowStreamReader.loadNextBatch()) {
- fieldVectors = root.getFieldVectors();
- if (fieldVectors.size() != seaTunnelDataTypes.length) {
- log.error(
- "seaTunnel schema size '{}' is not equal to arrow field size '{}'.",
- fieldVectors.size(),
- seaTunnelDataTypes.length);
- throw new StarRocksConnectorException(
- StarRocksConnectorErrorCode.READER_ARROW_DATA_FAILED,
- "schema size of fetch data is wrong.");
- }
- if (fieldVectors.size() == 0 || root.getRowCount() == 0) {
- log.debug("one batch in arrow has no data.");
- continue;
- }
- log.info("one batch in arrow row count size '{}'", root.getRowCount());
- rowCountInOneBatch = root.getRowCount();
- // init the rowBatch
- for (int i = 0; i < rowCountInOneBatch; ++i) {
- seaTunnelRowBatch.add(new SeaTunnelRow(fieldVectors.size()));
- }
-
- convertArrowToRowBatch();
- readRowCount += root.getRowCount();
- }
- return this;
- } catch (Exception e) {
- throw new StarRocksConnectorException(
- StarRocksConnectorErrorCode.READER_ARROW_DATA_FAILED, e);
- } finally {
- close();
- }
- }
-
- public boolean hasNext() {
- if (offsetInRowBatch < readRowCount) {
- return true;
- }
- return false;
- }
-
- private void addValueToRow(int rowIndex, int colIndex, Object obj) {
- if (rowIndex > rowCountInOneBatch) {
- throw new StarRocksConnectorException(
- StarRocksConnectorErrorCode.READER_ARROW_DATA_FAILED,
- String.format(
- "Get row offset: %d larger than row size: %d",
- rowIndex, rowCountInOneBatch));
- }
- seaTunnelRowBatch.get(readRowCount + rowIndex).setField(colIndex, obj);
- }
-
- public void convertArrowToRowBatch() {
- try {
- for (int col = 0; col < fieldVectors.size(); col++) {
- SeaTunnelDataType> dataType = seaTunnelDataTypes[col];
- final String currentType = dataType.getSqlType().name();
-
- FieldVector curFieldVector = fieldVectors.get(col);
- Types.MinorType mt = curFieldVector.getMinorType();
- switch (dataType.getSqlType()) {
- case BOOLEAN:
- checkArgument(
- mt.equals(Types.MinorType.BIT),
- "seaTunnel type is %1$s, but arrow type is %2$s.",
- currentType,
- mt.name());
- BitVector bitVector = (BitVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
- Object fieldValue =
- bitVector.isNull(rowIndex)
- ? null
- : bitVector.get(rowIndex) != 0;
- addValueToRow(rowIndex, col, fieldValue);
- }
- break;
- case TINYINT:
- checkArgument(
- mt.equals(Types.MinorType.TINYINT),
- "seaTunnel type is %1$s, but arrow type is %2$s.",
- currentType,
- mt.name());
- TinyIntVector tinyIntVector = (TinyIntVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
- Object fieldValue =
- tinyIntVector.isNull(rowIndex)
- ? null
- : tinyIntVector.get(rowIndex);
- addValueToRow(rowIndex, col, fieldValue);
- }
- break;
- case SMALLINT:
- checkArgument(
- mt.equals(Types.MinorType.SMALLINT),
- "seaTunnel type is %1$s, but arrow type is %2$s.",
- currentType,
- mt.name());
- SmallIntVector smallIntVector = (SmallIntVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
- Object fieldValue =
- smallIntVector.isNull(rowIndex)
- ? null
- : smallIntVector.get(rowIndex);
- addValueToRow(rowIndex, col, fieldValue);
- }
- break;
- case INT:
- checkArgument(
- mt.equals(Types.MinorType.INT),
- "seaTunnel type is %1$s, but arrow type is %2$s.",
- currentType,
- mt.name());
- IntVector intVector = (IntVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
- Object fieldValue =
- intVector.isNull(rowIndex) ? null : intVector.get(rowIndex);
- addValueToRow(rowIndex, col, fieldValue);
- }
- break;
- case BIGINT:
- checkArgument(
- mt.equals(Types.MinorType.BIGINT),
- "seaTunnel type is %1$s, but arrow type is %2$s.",
- currentType,
- mt.name());
- BigIntVector bigIntVector = (BigIntVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
- Object fieldValue =
- bigIntVector.isNull(rowIndex)
- ? null
- : bigIntVector.get(rowIndex);
- addValueToRow(rowIndex, col, fieldValue);
- }
- break;
- case FLOAT:
- checkArgument(
- mt.equals(Types.MinorType.FLOAT4),
- "seaTunnel type is %1$s, but arrow type is %2$s.",
- currentType,
- mt.name());
- Float4Vector float4Vector = (Float4Vector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
- Object fieldValue =
- float4Vector.isNull(rowIndex)
- ? null
- : float4Vector.get(rowIndex);
- addValueToRow(rowIndex, col, fieldValue);
- }
- break;
- case DOUBLE:
- checkArgument(
- mt.equals(Types.MinorType.FLOAT8),
- "seaTunnel type is %1$s, but arrow type is %2$s.",
- currentType,
- mt.name());
- Float8Vector float8Vector = (Float8Vector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
- Object fieldValue =
- float8Vector.isNull(rowIndex)
- ? null
- : float8Vector.get(rowIndex);
- addValueToRow(rowIndex, col, fieldValue);
- }
- break;
- case DECIMAL:
- checkArgument(
- mt.equals(Types.MinorType.DECIMAL),
- "seaTunnel type is %1$s, but arrow type is %2$s.",
- currentType,
- mt.name());
- DecimalVector decimalVector = (DecimalVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
- if (decimalVector.isNull(rowIndex)) {
- addValueToRow(rowIndex, col, null);
- continue;
- }
- BigDecimal value = decimalVector.getObject(rowIndex);
- addValueToRow(rowIndex, col, value);
- }
- break;
- case DATE:
- checkArgument(
- mt.equals(Types.MinorType.VARCHAR),
- "seaTunnel type is %1$s, but arrow type is %2$s.",
- currentType,
- mt.name());
- VarCharVector date = (VarCharVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
- if (date.isNull(rowIndex)) {
- addValueToRow(rowIndex, col, null);
- continue;
- }
- String value = new String(date.get(rowIndex));
- LocalDate localDate = LocalDate.parse(value, dateFormatter);
- addValueToRow(rowIndex, col, localDate);
- }
- break;
- case TIMESTAMP:
- checkArgument(
- mt.equals(Types.MinorType.VARCHAR),
- "seaTunnel type is %1$s, but arrow type is %2$s.",
- currentType,
- mt.name());
- VarCharVector timeStampSecVector = (VarCharVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
- if (timeStampSecVector.isNull(rowIndex)) {
- addValueToRow(rowIndex, col, null);
- continue;
- }
- String value = new String(timeStampSecVector.get(rowIndex));
- LocalDateTime parse = LocalDateTime.parse(value, dateTimeFormatter);
- addValueToRow(rowIndex, col, parse);
- }
- break;
- case STRING:
- checkArgument(
- mt.equals(Types.MinorType.VARCHAR),
- "seaTunnel type is %1$s, but arrow type is %2$s.",
- currentType,
- mt.name());
- VarCharVector varCharVector = (VarCharVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
- if (varCharVector.isNull(rowIndex)) {
- addValueToRow(rowIndex, col, null);
- continue;
- }
- String value = new String(varCharVector.get(rowIndex));
- addValueToRow(rowIndex, col, value);
- }
- break;
- default:
- throw new StarRocksConnectorException(
- StarRocksConnectorErrorCode.READER_ARROW_DATA_FAILED,
- String.format(
- "Unsupported type %s",
- seaTunnelDataTypes[col].getSqlType().name()));
- }
- }
- } catch (Exception e) {
- close();
- throw new StarRocksConnectorException(
- StarRocksConnectorErrorCode.READER_ARROW_DATA_FAILED, e);
- }
- }
-
- public SeaTunnelRow next() {
- if (!hasNext()) {
- throw new StarRocksConnectorException(
- StarRocksConnectorErrorCode.READER_ARROW_DATA_FAILED,
- String.format(
- "Get row offset: %d larger than row size: %d",
- offsetInRowBatch, readRowCount));
- }
- return seaTunnelRowBatch.get(offsetInRowBatch++);
- }
-
- public int getReadRowCount() {
- return readRowCount;
- }
-
- public void close() {
- try {
- if (arrowStreamReader != null) {
- arrowStreamReader.close();
- }
- if (rootAllocator != null) {
- rootAllocator.close();
- }
- } catch (IOException e) {
- throw new StarRocksConnectorException(
- StarRocksConnectorErrorCode.READER_ARROW_DATA_FAILED,
- "Failed to close ArrowStreamReader",
- e);
- }
- }
-}
diff --git a/seatunnel-shade/pom.xml b/seatunnel-shade/pom.xml
index 2b378739de7..0011f6b37d2 100644
--- a/seatunnel-shade/pom.xml
+++ b/seatunnel-shade/pom.xml
@@ -30,13 +30,12 @@
seatunnel-hadoop3-3.1.4-uber
seatunnel-jackson
seatunnel-guava
- seatunnel-arrow-5.0
seatunnel-thrift-service
seatunnel-hazelcast
seatunnel-janino
seatunnel-jetty9-9.4.56
seatunnel-hadoop-aws
-
+ seatunnel-arrow
diff --git a/seatunnel-shade/seatunnel-arrow-5.0/pom.xml b/seatunnel-shade/seatunnel-arrow/pom.xml
similarity index 78%
rename from seatunnel-shade/seatunnel-arrow-5.0/pom.xml
rename to seatunnel-shade/seatunnel-arrow/pom.xml
index f0d1f2b32b5..27b49ef731c 100644
--- a/seatunnel-shade/seatunnel-arrow-5.0/pom.xml
+++ b/seatunnel-shade/seatunnel-arrow/pom.xml
@@ -22,11 +22,11 @@
${revision}
- seatunnel-arrow-5.0
- SeaTunnel : Shade : Arrow5.0
+ seatunnel-arrow
+ SeaTunnel : Shade : Arrow
- 5.0.0
+ 15.0.1
@@ -39,36 +39,12 @@
org.apache.arrow
arrow-memory-netty
${arrow.version}
-
-
- com.fasterxml.jackson.core
- jackson-annotations
-
-
- com.fasterxml.jackson.core
- jackson-core
-
-
- com.fasterxml.jackson.core
- jackson-databind
-
-
- io.netty
- netty-common
-
-
-
-
- io.netty
- netty-common
- 4.1.77.Final
- org.apache.maven.plugins
maven-shade-plugin
@@ -77,8 +53,8 @@
package
- seatunnel-arrow-5.0
- ${enableSourceJarCreation}
+ seatunnel-arrow
+ true
true
false
false
@@ -97,13 +73,17 @@
org.apache.arrow
${seatunnel.shade.package}.org.apache.arrow
+
+ io.netty
+ ${seatunnel.shade.package}.io.netty
+
com.google.flatbuffers
${seatunnel.shade.package}.com.google.flatbuffers
- io.netty
- ${seatunnel.shade.package}.io.netty
+ com.fasterxml.jackson
+ ${seatunnel.shade.package}.com.fasterxml.jackson
@@ -123,7 +103,7 @@
- ${basedir}/target/seatunnel-arrow-5.0.jar
+ ${basedir}/target/seatunnel-arrow.jar
jar
optional
@@ -134,5 +114,4 @@
-
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index b2012936bd2..62924093d1b 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -70,3 +70,13 @@ jetty-util-9.4.56.v20240826.jar
jetty-util-ajax-9.4.56.v20240826.jar
javax.servlet-api-3.1.0.jar
seatunnel-jetty9-9.4.56-2.3.9-SNAPSHOT-optional.jar
+arrow-format-15.0.1.jar
+arrow-memory-core-15.0.1.jar
+arrow-memory-netty-15.0.1.jar
+arrow-vector-15.0.1.jar
+eclipse-collections-11.1.0.jar
+eclipse-collections-api-11.1.0.jar
+flatbuffers-java-23.5.26.jar
+netty-buffer-4.1.104.Final.jar
+netty-common-4.1.104.Final.jar
+seatunnel-arrow-2.3.9-SNAPSHOT-optional.jar