Skip to content

Commit

Permalink
[Feature][core] support arrow transfers data to SeatunnelRow in arrow…
Browse files Browse the repository at this point in the history
… format
  • Loading branch information
hawk9821 committed Dec 4, 2024
1 parent 8b52d97 commit e56e7cb
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ public Object convert(int rowIndex, FixedSizeListVector fieldVector) {

@Override
public Object convert(
int rowIndex, FixedSizeListVector fieldVector, Map<String, Function> genericsConverters) {
int rowIndex,
FixedSizeListVector fieldVector,
Map<String, Function> genericsConverters) {
if (fieldVector.isNull(rowIndex)) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@

package org.apache.seatunnel.connectors.seatunnel.common.source.arrow.reader;

import lombok.extern.slf4j.Slf4j;
import org.apache.seatunnel.shade.org.apache.arrow.memory.RootAllocator;
import org.apache.seatunnel.shade.org.apache.arrow.vector.FieldVector;
import org.apache.seatunnel.shade.org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.seatunnel.shade.org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types;
import org.apache.seatunnel.shade.org.apache.arrow.vector.util.Text;

import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand All @@ -26,12 +32,8 @@
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.Converter;
import org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.DefaultConverter;
import org.apache.seatunnel.shade.org.apache.arrow.memory.RootAllocator;
import org.apache.seatunnel.shade.org.apache.arrow.vector.FieldVector;
import org.apache.seatunnel.shade.org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.seatunnel.shade.org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types;
import org.apache.seatunnel.shade.org.apache.arrow.vector.util.Text;

import lombok.extern.slf4j.Slf4j;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -257,19 +259,15 @@ private Object convertMap(
Map<String, Function> fieldConverters = new HashMap<>();
fieldConverters.put(Converter.MAP_KEY, genericsConvert(keyType));
fieldConverters.put(Converter.MAP_VALUE, genericsConvert(valueType));
return converter.convert(
rowIndex,
fieldVector,
fieldConverters);
return converter.convert(rowIndex, fieldVector, fieldConverters);
}

private Object convertArray(
int rowIndex, Converter converter, FieldVector fieldVector, ArrayType arrayType) {
SqlType elementType = arrayType.getElementType().getSqlType();
Map<String, Function> fieldConverters = new HashMap<>();
fieldConverters.put(Converter.ARRAY_KEY, genericsConvert(elementType));
return converter.convert(
rowIndex, fieldVector, fieldConverters);
return converter.convert(rowIndex, fieldVector, fieldConverters);
}

private Object convertRow(
Expand Down

0 comments on commit e56e7cb

Please sign in to comment.