From e56e7cb9065c5148db2c42902cb87b14fbdcb5fc Mon Sep 17 00:00:00 2001 From: zhangdonghao Date: Tue, 3 Dec 2024 20:04:26 +0800 Subject: [PATCH] [Feature][core] support arrow transfers data to SeatunnelRow in arrow format --- .../converter/FixedSizeListConverter.java | 4 +++- .../reader/ArrowToSeatunnelRowReader.java | 24 +++++++++---------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/FixedSizeListConverter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/FixedSizeListConverter.java index 7dcc9b30eb4..c325cc50cde 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/FixedSizeListConverter.java +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/FixedSizeListConverter.java @@ -36,7 +36,9 @@ public Object convert(int rowIndex, FixedSizeListVector fieldVector) { @Override public Object convert( - int rowIndex, FixedSizeListVector fieldVector, Map genericsConverters) { + int rowIndex, + FixedSizeListVector fieldVector, + Map genericsConverters) { if (fieldVector.isNull(rowIndex)) { return null; } diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java index 8a5c4f575aa..6bd23bad0bc 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java @@ -17,7 +17,13 @@ package org.apache.seatunnel.connectors.seatunnel.common.source.arrow.reader; -import lombok.extern.slf4j.Slf4j; +import org.apache.seatunnel.shade.org.apache.arrow.memory.RootAllocator; +import org.apache.seatunnel.shade.org.apache.arrow.vector.FieldVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.seatunnel.shade.org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types; +import org.apache.seatunnel.shade.org.apache.arrow.vector.util.Text; + import org.apache.seatunnel.api.table.type.ArrayType; import org.apache.seatunnel.api.table.type.MapType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; @@ -26,12 +32,8 @@ import org.apache.seatunnel.api.table.type.SqlType; import org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.Converter; import org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.DefaultConverter; -import org.apache.seatunnel.shade.org.apache.arrow.memory.RootAllocator; -import org.apache.seatunnel.shade.org.apache.arrow.vector.FieldVector; -import org.apache.seatunnel.shade.org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.seatunnel.shade.org.apache.arrow.vector.ipc.ArrowStreamReader; -import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types; -import org.apache.seatunnel.shade.org.apache.arrow.vector.util.Text; + +import lombok.extern.slf4j.Slf4j; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -257,10 +259,7 @@ private Object convertMap( Map fieldConverters = new HashMap<>(); fieldConverters.put(Converter.MAP_KEY, genericsConvert(keyType)); fieldConverters.put(Converter.MAP_VALUE, genericsConvert(valueType)); - return converter.convert( - rowIndex, - fieldVector, - fieldConverters); + return converter.convert(rowIndex, fieldVector, fieldConverters); } private Object convertArray( @@ -268,8 +267,7 @@ private Object convertArray( SqlType elementType = arrayType.getElementType().getSqlType(); Map fieldConverters = new HashMap<>(); fieldConverters.put(Converter.ARRAY_KEY, genericsConvert(elementType)); - return converter.convert( - rowIndex, fieldVector, fieldConverters); + return converter.convert(rowIndex, fieldVector, fieldConverters); } private Object convertRow(