diff --git a/pom.xml b/pom.xml index e050018b2b8..7c433655921 100644 --- a/pom.xml +++ b/pom.xml @@ -159,6 +159,7 @@ true 3.1.4 + 15.0.1 @@ -489,6 +490,17 @@ provided + + org.apache.arrow + arrow-vector + ${arrow.version} + + + org.apache.arrow + arrow-memory-netty + ${arrow.version} + + diff --git a/seatunnel-common/pom.xml b/seatunnel-common/pom.xml index 218ec7dd9d5..a80ca7fbcd0 100644 --- a/seatunnel-common/pom.xml +++ b/seatunnel-common/pom.xml @@ -63,6 +63,13 @@ optional + + org.apache.seatunnel + seatunnel-arrow + ${project.version} + optional + + commons-codec commons-codec diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/Converter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/Converter.java new file mode 100644 index 00000000000..4c367e14ae7 --- /dev/null +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/Converter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter; + +import org.apache.seatunnel.shade.org.apache.arrow.vector.FieldVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types; + +import java.util.Map; +import java.util.function.Function; + +public interface Converter { + + String ARRAY_KEY = "ARRAY"; + String MAP_KEY = "KEY"; + String MAP_VALUE = "VALUE"; + + Object convert(int rowIndex, T fieldVector); + + default Object convert(int rowIndex, T fieldVector, Map genericsConverters) { + throw new UnsupportedOperationException("Unsupported generics convert"); + } + + boolean support(Types.MinorType type); +} diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/DefaultConverter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/DefaultConverter.java new file mode 100644 index 00000000000..7a9b6d0a47e --- /dev/null +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/DefaultConverter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter; + +import org.apache.seatunnel.shade.org.apache.arrow.vector.FieldVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types; + +public class DefaultConverter implements Converter { + + @Override + public Object convert(int rowIndex, FieldVector fieldVector) { + return fieldVector.isNull(rowIndex) ? null : fieldVector.getObject(rowIndex); + } + + @Override + public boolean support(Types.MinorType type) { + return false; + } +} diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/FixedSizeListConverter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/FixedSizeListConverter.java new file mode 100644 index 00000000000..c325cc50cde --- /dev/null +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/FixedSizeListConverter.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter; + +import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.FixedSizeListVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class FixedSizeListConverter implements Converter { + @Override + public Object convert(int rowIndex, FixedSizeListVector fieldVector) { + return fieldVector.isNull(rowIndex) ? null : fieldVector.getObject(rowIndex); + } + + @Override + public Object convert( + int rowIndex, + FixedSizeListVector fieldVector, + Map genericsConverters) { + if (fieldVector.isNull(rowIndex)) { + return null; + } + List listData = fieldVector.getObject(rowIndex); + Function converter = genericsConverters.get(ARRAY_KEY); + return listData.stream() + .map( + item -> { + if (item instanceof LocalDateTime) { + LocalDateTime localDateTime = + ((LocalDateTime) item) + .atZone(ZoneOffset.UTC) + .withZoneSameInstant(ZoneId.systemDefault()) + .toLocalDateTime(); + return converter.apply(localDateTime); + } else { + return converter.apply(item); + } + }) + .collect(Collectors.toList()); + } + + @Override + public boolean support(Types.MinorType type) { + return Types.MinorType.FIXED_SIZE_LIST == type; + } +} diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/LargeListConverter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/LargeListConverter.java new file mode 100644 index 00000000000..ed0d6ee26a9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/LargeListConverter.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter; + +import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.LargeListVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class LargeListConverter implements Converter { + @Override + public Object convert(int rowIndex, LargeListVector fieldVector) { + return fieldVector.isNull(rowIndex) ? null : fieldVector.getObject(rowIndex); + } + + @Override + public Object convert( + int rowIndex, LargeListVector fieldVector, Map genericsConverters) { + if (fieldVector.isNull(rowIndex)) { + return null; + } + if (fieldVector.isEmpty(rowIndex)) { + return Collections.emptyList(); + } + List listData = fieldVector.getObject(rowIndex); + Function converter = genericsConverters.get(ARRAY_KEY); + return listData.stream() + .map( + item -> { + if (item instanceof LocalDateTime) { + LocalDateTime localDateTime = + ((LocalDateTime) item) + .atZone(ZoneOffset.UTC) + .withZoneSameInstant(ZoneId.systemDefault()) + .toLocalDateTime(); + return converter.apply(localDateTime); + } else { + return converter.apply(item); + } + }) + .collect(Collectors.toList()); + } + + @Override + public boolean support(Types.MinorType type) { + return Types.MinorType.LARGELIST == type; + } +} diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/ListConverter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/ListConverter.java new file mode 100644 index 00000000000..3424f56018a --- /dev/null +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/ListConverter.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter; + +import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.ListVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class ListConverter implements Converter { + @Override + public Object convert(int rowIndex, ListVector fieldVector) { + return fieldVector.isNull(rowIndex) ? null : fieldVector.getObject(rowIndex); + } + + @Override + public Object convert( + int rowIndex, ListVector fieldVector, Map genericsConverters) { + if (fieldVector.isNull(rowIndex)) { + return null; + } + if (fieldVector.isEmpty(rowIndex)) { + return Collections.emptyList(); + } + List listData = fieldVector.getObject(rowIndex); + Function converter = genericsConverters.get(ARRAY_KEY); + return listData.stream() + .map( + item -> { + if (item instanceof LocalDateTime) { + LocalDateTime localDateTime = + ((LocalDateTime) item) + .atZone(ZoneOffset.UTC) + .withZoneSameInstant(ZoneId.systemDefault()) + .toLocalDateTime(); + return converter.apply(localDateTime); + } else { + return converter.apply(item); + } + }) + .collect(Collectors.toList()); + } + + @Override + public boolean support(Types.MinorType type) { + return Types.MinorType.LIST == type; + } +} diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/MapConverter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/MapConverter.java new file mode 100644 index 00000000000..18ec3c6910e --- /dev/null +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/MapConverter.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter; + +import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.MapVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.impl.UnionMapReader; +import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +public class MapConverter implements Converter { + @Override + public Object convert(int rowIndex, MapVector fieldVector) { + return fieldVector.isNull(rowIndex) ? null : fieldVector.getObject(rowIndex); + } + + @Override + public Object convert( + int rowIndex, MapVector fieldVector, Map genericsConverters) { + UnionMapReader reader = fieldVector.getReader(); + reader.setPosition(rowIndex); + Map mapValue = new HashMap<>(); + Function keyConverter = genericsConverters.get(MAP_KEY); + Function valueConverter = genericsConverters.get(MAP_VALUE); + while (reader.next()) { + Object key = keyConverter.apply(processTimeZone(reader.key().readObject())); + Object value = valueConverter.apply(processTimeZone(reader.value().readObject())); + mapValue.put(key, value); + } + return mapValue; + } + + private Object processTimeZone(Object value) { + if (value instanceof LocalDateTime) { + return ((LocalDateTime) value) + .atZone(ZoneOffset.UTC) + .withZoneSameInstant(ZoneId.systemDefault()) + .toLocalDateTime(); + } else { + return value; + } + } + + @Override + public boolean support(Types.MinorType type) { + return Types.MinorType.MAP == type; + } +} diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/NullConverter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/NullConverter.java new file mode 100644 index 00000000000..50b235ea8ec --- /dev/null +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/NullConverter.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter; + +import org.apache.seatunnel.shade.org.apache.arrow.vector.NullVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types; + +public class NullConverter implements Converter { + @Override + public Object convert(int rowIndex, NullVector fieldVector) { + return null; + } + + @Override + public boolean support(Types.MinorType type) { + return Types.MinorType.NULL == type; + } +} diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/StructConverter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/StructConverter.java new file mode 100644 index 00000000000..b74ccde554b --- /dev/null +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/StructConverter.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter; + +import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.StructVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types; + +import lombok.extern.slf4j.Slf4j; + +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +@Slf4j +public class StructConverter implements Converter { + @Override + public Object convert(int rowIndex, StructVector fieldVector) { + return fieldVector.isNull(rowIndex) ? null : fieldVector.getObject(rowIndex); + } + + @Override + public Object convert( + int rowIndex, StructVector fieldVector, Map genericsConverters) { + Map valueMap = fieldVector.getObject(rowIndex); + return valueMap.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> { + Optional optional = + Optional.ofNullable(genericsConverters.get(e.getKey())); + if (optional.isPresent()) { + return optional.get().apply(e.getValue()); + } else { + log.warn("No converter found for key:{}", e.getKey()); + return e.getValue(); + } + })); + } + + @Override + public boolean support(Types.MinorType type) { + return Types.MinorType.STRUCT == type; + } +} diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/TimeStampMicroConverter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/TimeStampMicroConverter.java new file mode 100644 index 00000000000..eee0a44ee6f --- /dev/null +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/TimeStampMicroConverter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter; + +import org.apache.seatunnel.shade.org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; + +public class TimeStampMicroConverter implements Converter { + @Override + public Object convert(int rowIndex, TimeStampMicroVector fieldVector) { + if (fieldVector == null || fieldVector.isNull(rowIndex)) { + return null; + } + LocalDateTime localDateTime = fieldVector.getObject(rowIndex); + return localDateTime + .atZone(ZoneOffset.UTC) + .withZoneSameInstant(ZoneId.systemDefault()) + .toLocalDateTime(); + } + + @Override + public boolean support(Types.MinorType type) { + return Types.MinorType.TIMESTAMPMICRO == type; + } +} diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/TimeStampMilliConverter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/TimeStampMilliConverter.java new file mode 100644 index 00000000000..3410671e808 --- /dev/null +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/TimeStampMilliConverter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter; + +import org.apache.seatunnel.shade.org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; + +public class TimeStampMilliConverter implements Converter { + @Override + public Object convert(int rowIndex, TimeStampMilliVector fieldVector) { + if (fieldVector == null || fieldVector.isNull(rowIndex)) { + return null; + } + LocalDateTime localDateTime = fieldVector.getObject(rowIndex); + return localDateTime + .atZone(ZoneOffset.UTC) + .withZoneSameInstant(ZoneId.systemDefault()) + .toLocalDateTime(); + } + + @Override + public boolean support(Types.MinorType type) { + return Types.MinorType.TIMESTAMPMILLI == type; + } +} diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/TimeStampNanoConverter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/TimeStampNanoConverter.java new file mode 100644 index 00000000000..17187abba79 --- /dev/null +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/TimeStampNanoConverter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter; + +import org.apache.seatunnel.shade.org.apache.arrow.vector.TimeStampNanoVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; + +public class TimeStampNanoConverter implements Converter { + @Override + public Object convert(int rowIndex, TimeStampNanoVector fieldVector) { + if (fieldVector == null || fieldVector.isNull(rowIndex)) { + return null; + } + LocalDateTime localDateTime = fieldVector.getObject(rowIndex); + return localDateTime + .atZone(ZoneOffset.UTC) + .withZoneSameInstant(ZoneId.systemDefault()) + .toLocalDateTime(); + } + + @Override + public boolean support(Types.MinorType type) { + return Types.MinorType.TIMESTAMPNANO == type; + } +} diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java new file mode 100644 index 00000000000..6bd23bad0bc --- /dev/null +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.common.source.arrow.reader; + +import org.apache.seatunnel.shade.org.apache.arrow.memory.RootAllocator; +import org.apache.seatunnel.shade.org.apache.arrow.vector.FieldVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.seatunnel.shade.org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types; +import org.apache.seatunnel.shade.org.apache.arrow.vector.util.Text; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.Converter; +import org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.DefaultConverter; + +import lombok.extern.slf4j.Slf4j; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.math.BigDecimal; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.function.Function; + +@Slf4j +public class ArrowToSeatunnelRowReader implements AutoCloseable { + + private final SeaTunnelDataType[] seaTunnelDataTypes; + private int offsetInRowBatch = 0; + private int rowCountInOneBatch = 0; + private int readRowCount = 0; + private List fieldVectors; + private VectorSchemaRoot root; + private ArrowStreamReader arrowStreamReader; + private RootAllocator rootAllocator; + private final Map fieldIndexMap = new HashMap<>(); + private final List seatunnelRowBatch = new ArrayList<>(); + private static final List converters = new ArrayList<>(); + private final DefaultConverter defaultConverter = new DefaultConverter(); + private final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + private final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss"); + private final DateTimeFormatter DATETIME_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + static { + ServiceLoader.load(Converter.class).forEach(converters::add); + } + + public ArrowToSeatunnelRowReader(byte[] byteArray, SeaTunnelRowType seaTunnelRowType) { + this.seaTunnelDataTypes = seaTunnelRowType.getFieldTypes(); + initFieldIndexMap(seaTunnelRowType); + initArrowReader(byteArray); + } + + private void initFieldIndexMap(SeaTunnelRowType seaTunnelRowType) { + for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) { + fieldIndexMap.put(seaTunnelRowType.getFieldNames()[i], i); + } + } + + private void initArrowReader(byte[] byteArray) { + this.rootAllocator = new RootAllocator(Integer.MAX_VALUE); + this.arrowStreamReader = + new ArrowStreamReader(new ByteArrayInputStream(byteArray), rootAllocator); + } + + public ArrowToSeatunnelRowReader readArrow() { + try { + this.root = arrowStreamReader.getVectorSchemaRoot(); + while (arrowStreamReader.loadNextBatch()) { + this.fieldVectors = root.getFieldVectors(); + if (fieldVectors.isEmpty() || root.getRowCount() == 0) { + log.debug("one batch in arrow has no data."); + continue; + } + log.info("one batch in arrow row count size '{}'", root.getRowCount()); + this.rowCountInOneBatch = root.getRowCount(); + for (int i = 0; i < rowCountInOneBatch; i++) { + seatunnelRowBatch.add(new SeaTunnelRow(this.seaTunnelDataTypes.length)); + } + convertSeatunnelRow(); + this.readRowCount += root.getRowCount(); + } + return this; + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + close(); + } + } + + public boolean hasNext() { + return offsetInRowBatch < readRowCount; + } + + public SeaTunnelRow next() { + if (!hasNext()) { + throw new IllegalStateException("no more rows to read."); + } + return seatunnelRowBatch.get(offsetInRowBatch++); + } + + private void convertSeatunnelRow() { + for (FieldVector fieldVector : fieldVectors) { + String name = fieldVector.getField().getName(); + Integer fieldIndex = fieldIndexMap.get(name); + Types.MinorType minorType = fieldVector.getMinorType(); + for (int i = 0; i < seatunnelRowBatch.size(); i++) { + // arrow field not in the Seatunnel Sechma field, skip it + if (fieldIndex != null) { + SeaTunnelDataType seaTunnelDataType = seaTunnelDataTypes[fieldIndex]; + Object fieldValue = + convertArrowData( + readRowCount + i, minorType, fieldVector, seaTunnelDataType); + fieldValue = + convertSeatunnelRowValue( + seaTunnelDataType.getSqlType(), minorType, fieldValue); + seatunnelRowBatch.get(readRowCount + i).setField(fieldIndex, fieldValue); + } + } + } + } + + public int getReadRowCount() { + return readRowCount; + } + + private Object convertSeatunnelRowValue( + SqlType currentType, Types.MinorType minorType, Object fieldValue) { + switch (currentType) { + case STRING: + if (fieldValue instanceof byte[]) { + return new String((byte[]) fieldValue); + } else if (fieldValue instanceof Text) { + return ((Text) fieldValue).toString(); + } else { + return fieldValue; + } + case DECIMAL: + if (fieldValue instanceof String) { + return new BigDecimal((String) fieldValue); + } else if (fieldValue instanceof Text) { + return new BigDecimal(((Text) fieldValue).toString()); + } else { + return fieldValue; + } + case DATE: + if (fieldValue instanceof Integer) { + return LocalDate.ofEpochDay((Integer) fieldValue); + } else if (fieldValue instanceof Long) { + return LocalDate.ofEpochDay((Long) fieldValue); + } else if (fieldValue instanceof String) { + return LocalDate.parse((String) fieldValue, DATE_FORMATTER); + } else if (fieldValue instanceof Text) { + return LocalDate.parse(((Text) fieldValue).toString(), DATE_FORMATTER); + } else if (fieldValue instanceof LocalDateTime) { + return ((LocalDateTime) fieldValue).toLocalDate(); + } else { + return fieldValue; + } + case TIME: + if (fieldValue instanceof Integer) { + return LocalTime.ofSecondOfDay((Integer) fieldValue); + } else if (fieldValue instanceof Long) { + return Instant.ofEpochMilli((Long) fieldValue) + .atZone(ZoneId.systemDefault()) + .toLocalDateTime() + .toLocalTime(); + } else if (fieldValue instanceof String) { + return LocalTime.parse((String) fieldValue, TIME_FORMATTER); + } else if (fieldValue instanceof Text) { + return LocalTime.parse(((Text) fieldValue).toString(), TIME_FORMATTER); + } else { + return fieldValue; + } + case TIMESTAMP: + if (fieldValue instanceof Long) { + return Instant.ofEpochMilli((Long) fieldValue) + .atZone(ZoneId.systemDefault()) + .toLocalDateTime(); + } else if (fieldValue instanceof String) { + return LocalDateTime.parse((String) fieldValue, DATETIME_FORMATTER); + } else if (fieldValue instanceof Text) { + return LocalDateTime.parse(((Text) fieldValue).toString(), DATETIME_FORMATTER); + } else { + return fieldValue; + } + default: + return fieldValue; + } + } + + private Object convertArrowData( + int rowIndex, + Types.MinorType minorType, + FieldVector fieldVector, + SeaTunnelDataType seaTunnelDataType) { + if (seaTunnelDataType == null) { + throw new IllegalArgumentException("seaTunnelDataType cannot be null"); + } + + for (Converter converter : converters) { + if (converter.support(minorType)) { + SqlType sqlType = seaTunnelDataType.getSqlType(); + switch (sqlType) { + case MAP: + return convertMap( + rowIndex, converter, fieldVector, (MapType) seaTunnelDataType); + case ARRAY: + return convertArray( + rowIndex, converter, fieldVector, (ArrayType) seaTunnelDataType); + case ROW: + return convertRow( + rowIndex, + converter, + fieldVector, + (SeaTunnelRowType) seaTunnelDataType); + default: + return converter.convert(rowIndex, fieldVector); + } + } + } + return defaultConverter.convert(rowIndex, fieldVector); + } + + private Object convertMap( + int rowIndex, Converter converter, FieldVector fieldVector, MapType mapType) { + SqlType keyType = mapType.getKeyType().getSqlType(); + SqlType valueType = mapType.getValueType().getSqlType(); + Map fieldConverters = new HashMap<>(); + fieldConverters.put(Converter.MAP_KEY, genericsConvert(keyType)); + fieldConverters.put(Converter.MAP_VALUE, genericsConvert(valueType)); + return converter.convert(rowIndex, fieldVector, fieldConverters); + } + + private Object convertArray( + int rowIndex, Converter converter, FieldVector fieldVector, ArrayType arrayType) { + SqlType elementType = arrayType.getElementType().getSqlType(); + Map fieldConverters = new HashMap<>(); + fieldConverters.put(Converter.ARRAY_KEY, genericsConvert(elementType)); + return converter.convert(rowIndex, fieldVector, fieldConverters); + } + + private Object convertRow( + int rowIndex, Converter converter, FieldVector fieldVector, SeaTunnelRowType rowType) { + String[] fieldNames = rowType.getFieldNames(); + List> fieldTypes = rowType.getChildren(); + Map fieldConverters = new HashMap<>(); + for (int i = 0; i < fieldTypes.size(); i++) { + fieldConverters.put(fieldNames[i], genericsConvert(fieldTypes.get(i).getSqlType())); + } + return converter.convert(rowIndex, fieldVector, fieldConverters); + } + + private Function genericsConvert(SqlType sqlType) { + return value -> convertSeatunnelRowValue(sqlType, null, value); + } + + @Override + public void close() { + try { + if (root != null) { + root.close(); + } + if (rootAllocator != null) { + rootAllocator.close(); + } + if (arrowStreamReader != null) { + arrowStreamReader.close(); + } + } catch (IOException e) { + throw new RuntimeException("failed to close arrow stream reader.", e); + } + } +} diff --git a/seatunnel-connectors-v2/connector-common/src/main/resources/META-INF/services/org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.Converter b/seatunnel-connectors-v2/connector-common/src/main/resources/META-INF/services/org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.Converter new file mode 100644 index 00000000000..dfdefc680ed --- /dev/null +++ b/seatunnel-connectors-v2/connector-common/src/main/resources/META-INF/services/org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.Converter @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.FixedSizeListConverter +org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.LargeListConverter +org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.ListConverter +org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.MapConverter +org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.NullConverter +org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.StructConverter +org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.TimeStampMicroConverter +org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.TimeStampMilliConverter +org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.TimeStampNanoConverter diff --git a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/ArrowToSeatunnelRowReaderTest.java b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/ArrowToSeatunnelRowReaderTest.java new file mode 100644 index 00000000000..8c85a4f1070 --- /dev/null +++ b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/ArrowToSeatunnelRowReaderTest.java @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.common.source.arrow; + +import org.apache.seatunnel.shade.com.google.common.base.Stopwatch; +import org.apache.seatunnel.shade.io.netty.util.CharsetUtil; +import org.apache.seatunnel.shade.org.apache.arrow.memory.ArrowBuf; +import org.apache.seatunnel.shade.org.apache.arrow.memory.BufferAllocator; +import org.apache.seatunnel.shade.org.apache.arrow.memory.RootAllocator; +import org.apache.seatunnel.shade.org.apache.arrow.vector.BigIntVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.BitVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.DateDayVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.DateMilliVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.DecimalVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.FieldVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.Float4Vector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.Float8Vector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.IntVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.LargeVarCharVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.SmallIntVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.TimeMicroVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.TimeStampMilliTZVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.TinyIntVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.VarBinaryVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.VarCharVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.ListVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.impl.UnionListWriter; +import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.impl.UnionMapWriter; +import org.apache.seatunnel.shade.org.apache.arrow.vector.holders.TimeMilliHolder; +import org.apache.seatunnel.shade.org.apache.arrow.vector.holders.VarCharHolder; +import org.apache.seatunnel.shade.org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.apache.seatunnel.shade.org.apache.arrow.vector.types.TimeUnit; +import org.apache.seatunnel.shade.org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.seatunnel.shade.org.apache.arrow.vector.types.pojo.Field; +import org.apache.seatunnel.shade.org.apache.arrow.vector.types.pojo.Schema; + +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.source.arrow.reader.ArrowToSeatunnelRowReader; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import lombok.extern.slf4j.Slf4j; + +import java.io.ByteArrayOutputStream; +import java.math.BigDecimal; +import java.nio.channels.Channels; +import java.nio.charset.StandardCharsets; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@Slf4j +public class ArrowToSeatunnelRowReaderTest { + + private static VectorSchemaRoot root; + private static RootAllocator rootAllocator; + private static final List seaTunnelDataTypeHolder = new ArrayList<>(); + + /** + * LocalDateTime.now() is timestamped with a precision of nanoseconds on linux and milliseconds + * on windows The test case uses TimeStampMicroVector to test the timestamp, thus truncating the + * timestamp accuracy to ChronoUnit.MILLIS + */ + private static final LocalDateTime localDateTime = + LocalDateTime.now().truncatedTo(ChronoUnit.MILLIS); + + private static final List stringData = new ArrayList<>(); + private static final List byteData = new ArrayList<>(); + private static final List shortData = new ArrayList<>(); + private static final List intData = new ArrayList<>(); + private static final List longData = new ArrayList<>(); + private static final float floatData = 1.23f; + private static final double doubleData = 1.23456789d; + private static final BigDecimal decimalData = new BigDecimal("1234567.89"); + private static final List> arrayData1 = new ArrayList<>(); + private static final List> arrayData2 = new ArrayList<>(); + private static final List> mapData = new ArrayList<>(); + + @BeforeAll + public static void beforeAll() throws Exception { + rootAllocator = new RootAllocator(Long.MAX_VALUE); + root = buildVectorSchemaRoot(rootAllocator, 10, true); + seaTunnelDataTypeHolder.add(new SeaTunnelDataTypeHolder("boolean", 1)); + seaTunnelDataTypeHolder.add(new SeaTunnelDataTypeHolder("byte", 1)); + seaTunnelDataTypeHolder.add(new SeaTunnelDataTypeHolder("short", 1)); + seaTunnelDataTypeHolder.add(new SeaTunnelDataTypeHolder("int", 1)); + seaTunnelDataTypeHolder.add(new SeaTunnelDataTypeHolder("long", 1)); + seaTunnelDataTypeHolder.add(new SeaTunnelDataTypeHolder("float", 1)); + seaTunnelDataTypeHolder.add(new SeaTunnelDataTypeHolder("double", 1)); + seaTunnelDataTypeHolder.add(new SeaTunnelDataTypeHolder("string1", 1)); + seaTunnelDataTypeHolder.add(new SeaTunnelDataTypeHolder("decimal", 1)); + seaTunnelDataTypeHolder.add(new SeaTunnelDataTypeHolder("timestamp1", 1)); + seaTunnelDataTypeHolder.add(new SeaTunnelDataTypeHolder("string2", 0)); + seaTunnelDataTypeHolder.add(new SeaTunnelDataTypeHolder("string3", 0)); + seaTunnelDataTypeHolder.add(new SeaTunnelDataTypeHolder("timestamp2", 0)); + seaTunnelDataTypeHolder.add(new SeaTunnelDataTypeHolder("time", 0)); + seaTunnelDataTypeHolder.add(new SeaTunnelDataTypeHolder("date1", 0)); + seaTunnelDataTypeHolder.add(new SeaTunnelDataTypeHolder("date2", 0)); + seaTunnelDataTypeHolder.add(new SeaTunnelDataTypeHolder("array1", 0)); + seaTunnelDataTypeHolder.add(new SeaTunnelDataTypeHolder("array2", 0)); + seaTunnelDataTypeHolder.add(new SeaTunnelDataTypeHolder("map", 0)); + } + + private static VectorSchemaRoot buildVectorSchemaRoot( + RootAllocator rootAllocator, int count, boolean allType) { + List vectors = new ArrayList<>(); + ZoneId zoneId = ZoneId.systemDefault(); + vectors.add(new BitVector("boolean", rootAllocator)); + vectors.add(new TinyIntVector("byte", rootAllocator)); + vectors.add(new SmallIntVector("short", rootAllocator)); + vectors.add(new IntVector("int", rootAllocator)); + vectors.add(new BigIntVector("long", rootAllocator)); + vectors.add(new Float4Vector("float", rootAllocator)); + vectors.add(new Float8Vector("double", rootAllocator)); + // varchar + vectors.add(new VarCharVector("string1", rootAllocator)); + vectors.add( + new DecimalVector( + Field.nullable("decimal", new ArrowType.Decimal(10, 2, 128)), + rootAllocator)); + // timestamp without timezone + vectors.add(new TimeStampMicroVector("timestamp1", rootAllocator)); + if (allType) { + // byte[] + vectors.add(new VarBinaryVector("string2", rootAllocator)); + // text + vectors.add(new LargeVarCharVector("string3", rootAllocator)); + // timestamp with timezone + vectors.add( + new TimeStampMilliTZVector( + Field.nullable( + "timestamp2", + new ArrowType.Timestamp( + TimeUnit.MILLISECOND, ZoneId.systemDefault().getId())), + rootAllocator)); + vectors.add(new TimeMicroVector("time", rootAllocator)); + vectors.add(new DateMilliVector("date1", rootAllocator)); + vectors.add(new DateDayVector("date2", rootAllocator)); + // array int + vectors.add(ListVector.empty("array1", rootAllocator)); + // array int + vectors.add(ListVector.empty("array2", rootAllocator)); + // map + } + // allocate storage + vectors.forEach(FieldVector::allocateNew); + // setVectorVaule + long epochMilli = + localDateTime + .truncatedTo(ChronoUnit.MILLIS) + .atZone(zoneId) + .toInstant() + .toEpochMilli(); + byte byteStart = 'a'; + + // setVectorValue + vectors.forEach( + vector -> { + for (int i = 0; i < count; i++) { + String stringValue = "test" + i; + if (vector instanceof BitVector) { + ((BitVector) vector).setSafe(i, i % 2 == 0 ? 0 : 1); + } else if (vector instanceof TinyIntVector) { + int i1 = byteStart + i; + byteData.add((byte) i1); + ((TinyIntVector) vector).setSafe(i, i1); + } else if (vector instanceof SmallIntVector) { + shortData.add((short) i); + ((SmallIntVector) vector).setSafe(i, i); + } else if (vector instanceof IntVector) { + intData.add(i); + ((IntVector) vector).setSafe(i, i); + } else if (vector instanceof BigIntVector) { + longData.add((long) i); + ((BigIntVector) vector).setSafe(i, i); + } else if (vector instanceof Float4Vector) { + ((Float4Vector) vector).setSafe(i, floatData); + } else if (vector instanceof Float8Vector) { + ((Float8Vector) vector).setSafe(i, doubleData); + } else if (vector instanceof DecimalVector) { + ((DecimalVector) vector).setSafe(i, decimalData); + } else if (vector instanceof VarCharVector) { + stringData.add(stringValue); + ((VarCharVector) vector) + .setSafe(i, (stringValue).getBytes(StandardCharsets.UTF_8)); + } else if (vector instanceof TimeStampMicroVector) { + ((TimeStampMicroVector) vector).setSafe(i, epochMilli * 1000); + } else if (vector instanceof VarBinaryVector) { + ((VarBinaryVector) vector) + .setSafe(i, (stringValue).getBytes(StandardCharsets.UTF_8)); + } else if (vector instanceof LargeVarCharVector) { + ((LargeVarCharVector) vector) + .setSafe(i, (stringValue).getBytes(StandardCharsets.UTF_8)); + } else if (vector instanceof TimeStampMilliTZVector) { + ((TimeStampMilliTZVector) vector).setSafe(i, epochMilli); + } else if (vector instanceof TimeMicroVector) { + ((TimeMicroVector) vector).setSafe(i, epochMilli); + } else if (vector instanceof DateMilliVector) { + ((DateMilliVector) vector).setSafe(i, epochMilli); + } else if (vector instanceof DateDayVector) { + ((DateDayVector) vector) + .setSafe(i, (int) localDateTime.toLocalDate().toEpochDay()); + } + } + }); + + // setListVectorValue + vectors.stream() + .filter(vector -> vector instanceof ListVector) + .forEach( + vector -> { + ListVector listVector = (ListVector) vector; + String name = listVector.getField().getName(); + UnionListWriter writer = listVector.getWriter(); + for (int i = 0; i < count; i++) { + writer.startList(); + writer.setPosition(i); + if ("array1".equals(name)) { + List intList = new ArrayList<>(); + for (int j = 0; j < 5; j++) { + int i1 = j + i; + writer.writeInt(i1); + intList.add(i1); + } + writer.setValueCount(5); + writer.endList(); + arrayData1.add(intList); + } + if ("array2".equals(name)) { + List dateTimeList = new ArrayList<>(); + for (int j = 0; j < 5; j++) { + writer.writeTimeStampMilliTZ(epochMilli); + dateTimeList.add(localDateTime); + } + writer.setValueCount(5); + writer.endList(); + arrayData2.add(dateTimeList); + } + } + }); + // setMapVectorValue + + // setValueCount + vectors.forEach(vector -> vector.setValueCount(count)); + List fields = + vectors.stream().map(FieldVector::getField).collect(Collectors.toList()); + Schema schema = new Schema(fields); + return new VectorSchemaRoot(schema, vectors, count); + } + + private static void writeKeyAndValue( + UnionMapWriter writer, Object value, int rowIndex, BufferAllocator allocator) { + writer.setPosition(rowIndex); + if (value instanceof String) { + byte[] bytes = ((String) value).getBytes(CharsetUtil.UTF_8); + ArrowBuf buffer = allocator.buffer(bytes.length); + buffer.writeBytes(bytes); + VarCharHolder holder = new VarCharHolder(); + holder.start = 0; + holder.buffer = buffer; + holder.end = bytes.length; + writer.write(holder); + } else if (value instanceof LocalDateTime) { + LocalDateTime dateTime = (LocalDateTime) value; + TimeMilliHolder holder = new TimeMilliHolder(); + holder.value = (int) dateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + writer.write(holder); + } + } + + @Test + public void testSeatunnelRow() throws Exception { + try (ByteArrayOutputStream out = new ByteArrayOutputStream(); + ArrowStreamWriter writer = + new ArrowStreamWriter( + root, /*DictionaryProvider=*/ null, Channels.newChannel(out))) { + writer.writeBatch(); + out.flush(); + List rows = new ArrayList<>(); + try (ArrowToSeatunnelRowReader reader = + new ArrowToSeatunnelRowReader(out.toByteArray(), getSeatunnelRowType(true)) + .readArrow()) { + while (reader.hasNext()) { + rows.add(reader.next()); + } + Assertions.assertEquals(10, rows.size()); + } + // check boolean + List actualBooleanData = + rows.stream().map(s -> s.getField(0)).distinct().collect(Collectors.toList()); + Assertions.assertEquals(Arrays.asList(Boolean.FALSE, Boolean.TRUE), actualBooleanData); + // check byte + List actualByteData = + rows.stream().map(s -> s.getField(1)).distinct().collect(Collectors.toList()); + Assertions.assertEquals(byteData, actualByteData); + // check short + List actualShortData = + rows.stream().map(s -> s.getField(2)).distinct().collect(Collectors.toList()); + Assertions.assertEquals(shortData, actualShortData); + // check int + List actualIntData = + rows.stream().map(s -> s.getField(3)).distinct().collect(Collectors.toList()); + Assertions.assertEquals(intData, actualIntData); + // check long + List actualLongData = + rows.stream().map(s -> s.getField(4)).distinct().collect(Collectors.toList()); + Assertions.assertEquals(longData, actualLongData); + // check float + List actualFloatData = + rows.stream().map(s -> s.getField(5)).distinct().collect(Collectors.toList()); + Assertions.assertEquals(Collections.singletonList(floatData), actualFloatData); + // check double + List actualDoubleData = + rows.stream().map(s -> s.getField(6)).distinct().collect(Collectors.toList()); + Assertions.assertEquals(Collections.singletonList(doubleData), actualDoubleData); + // check string1 + List actualStringData = + rows.stream().map(s -> s.getField(7)).collect(Collectors.toList()); + Assertions.assertEquals(stringData, actualStringData); + // check decimal + List actualDecimalData = + rows.stream().map(s -> s.getField(8)).distinct().collect(Collectors.toList()); + Assertions.assertEquals(Collections.singletonList(decimalData), actualDecimalData); + // check timestamp without tz + List actualTimestamp1Data = + rows.stream().map(s -> s.getField(9)).distinct().collect(Collectors.toList()); + Assertions.assertEquals(Collections.singletonList(localDateTime), actualTimestamp1Data); + // check string2 + List actualString2Data = + rows.stream().map(s -> s.getField(10)).collect(Collectors.toList()); + Assertions.assertEquals(stringData, actualString2Data); + // check string3 + List actualString3Data = + rows.stream().map(s -> s.getField(11)).collect(Collectors.toList()); + Assertions.assertEquals(stringData, actualString3Data); + + // check timestamp with tz + List actualTimestamp2Data = + rows.stream().map(s -> s.getField(12)).distinct().collect(Collectors.toList()); + Assertions.assertEquals(Collections.singletonList(localDateTime), actualTimestamp2Data); + + // check time + List actualTimeDate = + rows.stream().map(s -> s.getField(13)).distinct().collect(Collectors.toList()); + Assertions.assertEquals( + Collections.singletonList(localDateTime.toLocalTime()), actualTimeDate); + // check date1 + List actualDate1Data = + rows.stream().map(s -> s.getField(14)).distinct().collect(Collectors.toList()); + Assertions.assertEquals( + Collections.singletonList(localDateTime.toLocalDate()), actualDate1Data); + // check date2 + List actualDate2Data = + rows.stream().map(s -> s.getField(15)).distinct().collect(Collectors.toList()); + Assertions.assertEquals( + Collections.singletonList(localDateTime.toLocalDate()), actualDate2Data); + // check array int + List actualArrayIntData = + rows.stream().map(s -> s.getField(16)).collect(Collectors.toList()); + Assertions.assertIterableEquals(arrayData1, actualArrayIntData); + // check array timestamp + List actualArrayTimestampData = + rows.stream().map(s -> s.getField(17)).collect(Collectors.toList()); + Assertions.assertIterableEquals(arrayData2, actualArrayTimestampData); + // todo check map + // The java api has problems building MapVectors,and there are no examples on the + // official website + // @see https://github.com/apache/arrow/issues/44664 + } + } + + @Test + public void testConvertArrowSpeed() throws Exception { + Stopwatch stopwatch = Stopwatch.createStarted(); + int count = 1000000; + try (RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE); + VectorSchemaRoot vectorSchemaRoot = + buildVectorSchemaRoot(rootAllocator, count, false); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ArrowStreamWriter writer = + new ArrowStreamWriter( + vectorSchemaRoot, + /*DictionaryProvider=*/ null, + Channels.newChannel(out))) { + stopwatch.stop(); + System.out.printf( + "build %s rows vectorSchemaRoot cost %s ms \n", + count, stopwatch.elapsed(java.util.concurrent.TimeUnit.MILLISECONDS)); + writer.writeBatch(); + out.flush(); + List rows = new ArrayList<>(); + stopwatch.reset().start(); + SeaTunnelRowType seatunnelRowType = getSeatunnelRowType(false); + try (ArrowToSeatunnelRowReader reader = + new ArrowToSeatunnelRowReader(out.toByteArray(), seatunnelRowType) + .readArrow()) { + while (reader.hasNext()) { + rows.add(reader.next()); + } + stopwatch.stop(); + System.out.printf( + "read %s rows cost %s ms ", + rows.size(), stopwatch.elapsed(java.util.concurrent.TimeUnit.MILLISECONDS)); + Assertions.assertEquals(count, rows.size()); + } + } + } + + private SeaTunnelRowType getSeatunnelRowType(boolean allType) { + String[] fieldNames = + seaTunnelDataTypeHolder.stream() + .filter(h -> allType ? h.getFlag() >= 0 : h.getFlag() == 1) + .map(SeaTunnelDataTypeHolder::getFiledName) + .toArray(String[]::new); + SeaTunnelDataType[] seaTunnelDataTypes = + seaTunnelDataTypeHolder.stream() + .filter(h -> allType ? h.getFlag() >= 0 : h.getFlag() == 1) + .map(SeaTunnelDataTypeHolder::getSeatunnelDataType) + .toArray(SeaTunnelDataType[]::new); + return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes); + } + + @AfterAll + public static void afterAll() throws Exception { + try { + if (root != null) { + root.close(); + } + if (rootAllocator != null) { + rootAllocator.close(); + } + } catch (Exception e) { + throw new RuntimeException("failed to close arrow stream reader.", e); + } + } +} diff --git a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/SeaTunnelDataTypeHolder.java b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/SeaTunnelDataTypeHolder.java new file mode 100644 index 00000000000..6f1c5579da8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/SeaTunnelDataTypeHolder.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.common.source.arrow; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; + +public class SeaTunnelDataTypeHolder { + private final String filedName; + private final int flag; + + public SeaTunnelDataTypeHolder(String filedName, int flag) { + this.filedName = filedName; + this.flag = flag; + } + + public String getFiledName() { + return filedName; + } + + public int getFlag() { + return flag; + } + + public SeaTunnelDataType getSeatunnelDataType() { + switch (filedName) { + case "boolean": + return BasicType.BOOLEAN_TYPE; + case "byte": + return BasicType.BYTE_TYPE; + case "short": + return BasicType.SHORT_TYPE; + case "int": + return BasicType.INT_TYPE; + case "long": + return BasicType.LONG_TYPE; + case "float": + return BasicType.FLOAT_TYPE; + case "double": + return BasicType.DOUBLE_TYPE; + case "string1": + case "string2": + case "string3": + return BasicType.STRING_TYPE; + case "decimal": + return new DecimalType(10, 2); + case "timestamp1": + case "timestamp2": + return LocalTimeType.LOCAL_DATE_TIME_TYPE; + case "time": + return LocalTimeType.LOCAL_TIME_TYPE; + case "date1": + case "date2": + return LocalTimeType.LOCAL_DATE_TYPE; + case "array1": + return ArrayType.INT_ARRAY_TYPE; + case "array2": + return ArrayType.LOCAL_DATE_TIME_ARRAY_TYPE; + case "map": + return new MapType(BasicType.STRING_TYPE, LocalTimeType.LOCAL_DATE_TIME_TYPE); + default: + return null; + } + } +} diff --git a/seatunnel-connectors-v2/connector-doris/pom.xml b/seatunnel-connectors-v2/connector-doris/pom.xml index 85aafc97ad8..06cd650f4a9 100644 --- a/seatunnel-connectors-v2/connector-doris/pom.xml +++ b/seatunnel-connectors-v2/connector-doris/pom.xml @@ -82,11 +82,5 @@ - - org.apache.seatunnel - seatunnel-arrow-5.0 - ${project.version} - optional - diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java index 68d2eecfb51..e68366cf260 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java @@ -27,8 +27,8 @@ import org.apache.seatunnel.connectors.doris.rest.models.Schema; import org.apache.seatunnel.connectors.doris.source.DorisSourceTable; import org.apache.seatunnel.connectors.doris.source.serialization.Routing; -import org.apache.seatunnel.connectors.doris.source.serialization.RowBatch; import org.apache.seatunnel.connectors.doris.util.SchemaUtils; +import org.apache.seatunnel.connectors.seatunnel.common.source.arrow.reader.ArrowToSeatunnelRowReader; import org.apache.doris.sdk.thrift.TScanBatchResult; import org.apache.doris.sdk.thrift.TScanCloseParams; @@ -60,12 +60,12 @@ public class DorisValueReader { protected int offset = 0; protected AtomicBoolean eos = new AtomicBoolean(false); - protected RowBatch rowBatch; + protected ArrowToSeatunnelRowReader rowBatch; // flag indicate if support deserialize Arrow to RowBatch asynchronously protected boolean deserializeArrowToRowBatchAsync; - protected BlockingQueue rowBatchBlockingQueue; + protected BlockingQueue rowBatchBlockingQueue; private TScanOpenParams openParams; protected String contextId; protected Schema schema; @@ -115,12 +115,12 @@ private BackendClient backendClient() { private TScanOpenParams openParams() { TScanOpenParams params = new TScanOpenParams(); - params.cluster = DORIS_DEFAULT_CLUSTER; - params.database = partition.getDatabase(); - params.table = partition.getTable(); + params.setCluster(DORIS_DEFAULT_CLUSTER); + params.setDatabase(partition.getDatabase()); + params.setTable(partition.getTable()); - params.tablet_ids = Arrays.asList(partition.getTabletIds().toArray(new Long[] {})); - params.opaqued_query_plan = partition.getQueryPlan(); + params.setTabletIds(Arrays.asList(partition.getTabletIds().toArray(new Long[] {}))); + params.setOpaquedQueryPlan(partition.getQueryPlan()); // max row number of one read batch Integer batchSize = dorisSourceTable.getBatchSize(); Integer queryDorisTimeout = config.getRequestQueryTimeoutS(); @@ -158,8 +158,10 @@ public void run() { TScanBatchResult nextResult = client.getNext(nextBatchParams); eos.set(nextResult.isEos()); if (!eos.get()) { - RowBatch rowBatch = - new RowBatch(nextResult, seaTunnelRowType) + ArrowToSeatunnelRowReader rowBatch = + new ArrowToSeatunnelRowReader( + nextResult.getRows(), + seaTunnelRowType) .readArrow(); offset += rowBatch.getReadRowCount(); rowBatch.close(); @@ -233,7 +235,10 @@ public boolean hasNext() { TScanBatchResult nextResult = client.getNext(nextBatchParams); eos.set(nextResult.isEos()); if (!eos.get()) { - rowBatch = new RowBatch(nextResult, seaTunnelRowType).readArrow(); + rowBatch = + new ArrowToSeatunnelRowReader( + nextResult.getRows(), seaTunnelRowType) + .readArrow(); } } hasNext = !eos.get(); diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java deleted file mode 100644 index a1f1c678aa4..00000000000 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java +++ /dev/null @@ -1,742 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.doris.source.serialization; - -import org.apache.seatunnel.shade.com.google.common.base.Preconditions; -import org.apache.seatunnel.shade.org.apache.arrow.memory.RootAllocator; -import org.apache.seatunnel.shade.org.apache.arrow.vector.BigIntVector; -import org.apache.seatunnel.shade.org.apache.arrow.vector.BitVector; -import org.apache.seatunnel.shade.org.apache.arrow.vector.DateDayVector; -import org.apache.seatunnel.shade.org.apache.arrow.vector.DecimalVector; -import org.apache.seatunnel.shade.org.apache.arrow.vector.FieldVector; -import org.apache.seatunnel.shade.org.apache.arrow.vector.FixedSizeBinaryVector; -import org.apache.seatunnel.shade.org.apache.arrow.vector.Float4Vector; -import org.apache.seatunnel.shade.org.apache.arrow.vector.Float8Vector; -import org.apache.seatunnel.shade.org.apache.arrow.vector.IntVector; -import org.apache.seatunnel.shade.org.apache.arrow.vector.SmallIntVector; -import org.apache.seatunnel.shade.org.apache.arrow.vector.TimeStampMicroVector; -import org.apache.seatunnel.shade.org.apache.arrow.vector.TinyIntVector; -import org.apache.seatunnel.shade.org.apache.arrow.vector.VarCharVector; -import org.apache.seatunnel.shade.org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.ListVector; -import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.MapVector; -import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.StructVector; -import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.impl.UnionMapReader; -import org.apache.seatunnel.shade.org.apache.arrow.vector.ipc.ArrowStreamReader; -import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types; -import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types.MinorType; -import org.apache.seatunnel.shade.org.apache.arrow.vector.util.Text; - -import org.apache.seatunnel.api.table.type.ArrayType; -import org.apache.seatunnel.api.table.type.DecimalArrayType; -import org.apache.seatunnel.api.table.type.MapType; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.api.table.type.SqlType; -import org.apache.seatunnel.common.utils.DateTimeUtils; -import org.apache.seatunnel.common.utils.DateUtils; -import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; -import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; - -import org.apache.doris.sdk.thrift.TScanBatchResult; - -import lombok.extern.slf4j.Slf4j; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.function.IntFunction; - -@Slf4j -public class RowBatch { - SeaTunnelDataType[] fieldTypes; - private final ArrowStreamReader arrowStreamReader; - private final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; - private final String DATETIMEV2_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS"; - private final DateTimeFormatter dateTimeV2Formatter = - DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN); - private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); - // offset for iterate the rowBatch - private int offsetInRowBatch = 0; - private int rowCountInOneBatch = 0; - private int readRowCount = 0; - private List seatunnelRowBatch = new ArrayList<>(); - private VectorSchemaRoot root; - private List fieldVectors; - private RootAllocator rootAllocator; - - public RowBatch(TScanBatchResult nextResult, SeaTunnelRowType seaTunnelRowType) { - this.rootAllocator = new RootAllocator(Integer.MAX_VALUE); - this.arrowStreamReader = - new ArrowStreamReader( - new ByteArrayInputStream(nextResult.getRows()), rootAllocator); - this.offsetInRowBatch = 0; - this.fieldTypes = seaTunnelRowType.getFieldTypes(); - } - - public RowBatch readArrow() { - try { - this.root = arrowStreamReader.getVectorSchemaRoot(); - while (arrowStreamReader.loadNextBatch()) { - fieldVectors = root.getFieldVectors(); - // Adapt unique model hidden columns - for (int i = 0; i < fieldVectors.size(); i++) { - String fieldName = fieldVectors.get(i).getField().getName(); - if (fieldName.equals("__DORIS_DELETE_SIGN__")) { - fieldVectors.remove(fieldVectors.get(i)); - } - } - if (fieldVectors.size() != fieldTypes.length) { - log.error( - "Schema size '{}' is not equal to arrow field size '{}'.", - fieldVectors.size(), - fieldTypes.length); - throw new DorisConnectorException( - DorisConnectorErrorCode.ARROW_READ_FAILED, - "Load Doris data failed, schema size of fetch data is wrong."); - } - if (fieldVectors.size() == 0 || root.getRowCount() == 0) { - log.debug("One batch in arrow has no data."); - continue; - } - rowCountInOneBatch = root.getRowCount(); - // init the rowBatch - for (int i = 0; i < rowCountInOneBatch; ++i) { - seatunnelRowBatch.add((new SeaTunnelRow(fieldVectors.size()))); - } - convertArrowToRowBatch(); - readRowCount += root.getRowCount(); - } - return this; - } catch (Throwable e) { - log.error("Read Doris Data failed because: ", e); - throw new DorisConnectorException( - DorisConnectorErrorCode.ARROW_READ_FAILED, e.getMessage()); - } finally { - close(); - } - } - - public boolean hasNext() { - return offsetInRowBatch < readRowCount; - } - - private void addValueToRow(int rowIndex, int colIndex, Object obj) { - if (rowIndex > rowCountInOneBatch) { - String errMsg = - "Get row offset: " + rowIndex + " larger than row size: " + rowCountInOneBatch; - log.error(errMsg); - throw new NoSuchElementException(errMsg); - } - seatunnelRowBatch.get(readRowCount + rowIndex).setField(colIndex, obj); - } - - public void convertArrowToRowBatch() throws DorisConnectorException { - try { - for (int col = 0; col < fieldVectors.size(); col++) { - SeaTunnelDataType dataType = fieldTypes[col]; - final String currentType = dataType.getSqlType().name(); - - FieldVector fieldVector = fieldVectors.get(col); - Types.MinorType minorType = fieldVector.getMinorType(); - convertArrowValue(col, currentType, dataType, minorType, fieldVector); - } - } catch (Throwable e) { - close(); - throw e; - } - } - - private void convertArrowValue( - int col, - String currentType, - SeaTunnelDataType dataType, - MinorType minorType, - FieldVector fieldVector) { - switch (currentType) { - case "BOOLEAN": - BitVector bitVector = (BitVector) fieldVector; - Preconditions.checkArgument( - minorType.equals(Types.MinorType.BIT), - typeMismatchMessage(currentType, minorType)); - addValueToRowForAllRows( - col, - rowIndex -> - bitVector.isNull(rowIndex) ? null : bitVector.get(rowIndex) != 0); - break; - case "TINYINT": - TinyIntVector tinyIntVector = (TinyIntVector) fieldVector; - Preconditions.checkArgument( - minorType.equals(Types.MinorType.TINYINT), - typeMismatchMessage(currentType, minorType)); - addValueToRowForAllRows( - col, - rowIndex -> - tinyIntVector.isNull(rowIndex) - ? null - : tinyIntVector.get(rowIndex)); - break; - case "SMALLINT": - if (fieldVector instanceof BitVector) { - BitVector bv = (BitVector) fieldVector; - Preconditions.checkArgument( - minorType.equals(Types.MinorType.BIT), - typeMismatchMessage(currentType, minorType)); - addValueToRowForAllRows( - col, rowIndex -> bv.isNull(rowIndex) ? null : (short) bv.get(rowIndex)); - - } else if (fieldVector instanceof TinyIntVector) { - TinyIntVector tv = (TinyIntVector) fieldVector; - Preconditions.checkArgument( - minorType.equals(MinorType.TINYINT), - typeMismatchMessage(currentType, minorType)); - addValueToRowForAllRows( - col, rowIndex -> tv.isNull(rowIndex) ? null : (short) tv.get(rowIndex)); - - } else { - SmallIntVector smallIntVector = (SmallIntVector) fieldVector; - Preconditions.checkArgument( - minorType.equals(Types.MinorType.SMALLINT), - typeMismatchMessage(currentType, minorType)); - addValueToRowForAllRows( - col, - rowIndex -> - smallIntVector.isNull(rowIndex) - ? null - : smallIntVector.get(rowIndex)); - } - break; - case "INT": - IntVector intVector = (IntVector) fieldVector; - Preconditions.checkArgument( - minorType.equals(Types.MinorType.INT), - typeMismatchMessage(currentType, minorType)); - addValueToRowForAllRows( - col, - rowIndex -> intVector.isNull(rowIndex) ? null : intVector.get(rowIndex)); - break; - case "BIGINT": - BigIntVector bigIntVector = (BigIntVector) fieldVector; - Preconditions.checkArgument( - minorType.equals(Types.MinorType.BIGINT), - typeMismatchMessage(currentType, minorType)); - addValueToRowForAllRows( - col, - rowIndex -> - bigIntVector.isNull(rowIndex) ? null : bigIntVector.get(rowIndex)); - break; - case "FLOAT": - Float4Vector float4Vector = (Float4Vector) fieldVector; - Preconditions.checkArgument( - minorType.equals(Types.MinorType.FLOAT4), - typeMismatchMessage(currentType, minorType)); - addValueToRowForAllRows( - col, - rowIndex -> - float4Vector.isNull(rowIndex) ? null : float4Vector.get(rowIndex)); - break; - case "DOUBLE": - Float8Vector float8Vector = (Float8Vector) fieldVector; - Preconditions.checkArgument( - minorType.equals(Types.MinorType.FLOAT8), - typeMismatchMessage(currentType, minorType)); - addValueToRowForAllRows( - col, - rowIndex -> - float8Vector.isNull(rowIndex) ? null : float8Vector.get(rowIndex)); - break; - case "DECIMAL": - // LARGEINT - if (fieldVector instanceof FixedSizeBinaryVector) { - FixedSizeBinaryVector largeInitFixedSizeBinaryVector = - (FixedSizeBinaryVector) fieldVector; - Preconditions.checkArgument( - minorType.equals(Types.MinorType.FIXEDSIZEBINARY), - typeMismatchMessage(currentType, minorType)); - addValueToRowForAllRows( - col, - rowIndex -> { - if (largeInitFixedSizeBinaryVector.isNull(rowIndex)) { - return null; - } - byte[] bytes = largeInitFixedSizeBinaryVector.get(rowIndex); - int left = 0, right = bytes.length - 1; - while (left < right) { - byte temp = bytes[left]; - bytes[left] = bytes[right]; - bytes[right] = temp; - left++; - right--; - } - return new BigDecimal(new BigInteger(bytes), 0); - }); - break; - } else if (fieldVector instanceof VarCharVector) { - VarCharVector varCharVector = (VarCharVector) fieldVector; - Preconditions.checkArgument( - minorType.equals(Types.MinorType.VARCHAR), - typeMismatchMessage(currentType, minorType)); - addValueToRowForAllRows( - col, - rowIndex -> - varCharVector.isNull(rowIndex) - ? null - : new BigDecimal( - new String(varCharVector.get(rowIndex)))); - break; - } - DecimalVector decimalVector = (DecimalVector) fieldVector; - Preconditions.checkArgument( - minorType.equals(Types.MinorType.DECIMAL), - typeMismatchMessage(currentType, minorType)); - addValueToRowForAllRows( - col, - rowIndex -> - decimalVector.isNull(rowIndex) - ? null - : decimalVector.getObject(rowIndex).stripTrailingZeros()); - break; - case "DATE": - case "DATEV2": - if (fieldVector instanceof DateDayVector) { - DateDayVector dateVector = (DateDayVector) fieldVector; - Preconditions.checkArgument( - minorType.equals(Types.MinorType.DATEDAY), - typeMismatchMessage(currentType, minorType)); - addValueToRowForAllRows( - col, - rowIndex -> { - if (dateVector.isNull(rowIndex)) { - return null; - } - return LocalDate.ofEpochDay(dateVector.get(rowIndex)); - }); - break; - } - VarCharVector dateVector = (VarCharVector) fieldVector; - Preconditions.checkArgument( - minorType.equals(Types.MinorType.VARCHAR), - typeMismatchMessage(currentType, minorType)); - addValueToRowForAllRows( - col, - rowIndex -> { - if (dateVector.isNull(rowIndex)) { - return null; - } - String stringValue = new String(dateVector.get(rowIndex)); - return LocalDate.parse(stringValue, dateFormatter); - }); - break; - case "TIMESTAMP": - if (fieldVector instanceof TimeStampMicroVector) { - TimeStampMicroVector timestampVector = (TimeStampMicroVector) fieldVector; - - addValueToRowForAllRows( - col, - rowIndex -> { - if (timestampVector.isNull(rowIndex)) { - return null; - } - String stringValue = timestampVector.getObject(rowIndex).toString(); - stringValue = completeMilliseconds(stringValue); - - return DateTimeUtils.parse(stringValue); - }); - break; - } - VarCharVector timestampVector = (VarCharVector) fieldVector; - Preconditions.checkArgument( - minorType.equals(Types.MinorType.VARCHAR), - typeMismatchMessage(currentType, minorType)); - addValueToRowForAllRows( - col, - rowIndex -> { - if (timestampVector.isNull(rowIndex)) { - return null; - } - String stringValue = new String(timestampVector.get(rowIndex)); - stringValue = completeMilliseconds(stringValue); - return LocalDateTime.parse(stringValue, dateTimeV2Formatter); - }); - break; - case "STRING": - if (fieldVector instanceof FixedSizeBinaryVector) { - FixedSizeBinaryVector fixedSizeBinaryVector = - (FixedSizeBinaryVector) fieldVector; - Preconditions.checkArgument( - minorType.equals(Types.MinorType.FIXEDSIZEBINARY), - typeMismatchMessage(currentType, minorType)); - addValueToRowForAllRows( - col, - rowIndex -> { - if (fixedSizeBinaryVector.isNull(rowIndex)) { - return null; - } - byte[] bytes = fixedSizeBinaryVector.get(rowIndex); - int left = 0, right = bytes.length - 1; - while (left < right) { - byte temp = bytes[left]; - bytes[left] = bytes[right]; - bytes[right] = temp; - left++; - right--; - } - return new BigInteger(bytes).toString(); - }); - break; - } else if (fieldVector instanceof MapVector) { - MapVector mapVector = (MapVector) fieldVector; - UnionMapReader reader = mapVector.getReader(); - Preconditions.checkArgument( - minorType.equals(MinorType.MAP), - typeMismatchMessage(currentType, minorType)); - addValueToRowForAllRows( - col, - rowIndex -> { - if (mapVector.isNull(rowIndex)) { - return null; - } - reader.setPosition(rowIndex); - Map mapValue = new HashMap<>(); - while (reader.next()) { - mapValue.put( - reader.key().readObject().toString(), - reader.value().readObject()); - } - return mapValue.toString(); - }); - } else if (fieldVector instanceof StructVector) { - StructVector structVector = (StructVector) fieldVector; - Preconditions.checkArgument( - minorType.equals(MinorType.STRUCT), - typeMismatchMessage(currentType, minorType)); - addValueToRowForAllRows( - col, - rowIndex -> { - if (structVector.isNull(rowIndex)) { - return null; - } - Map structValue = structVector.getObject(rowIndex); - return structValue.toString(); - }); - } else if (fieldVector instanceof ListVector) { - ListVector listVector = (ListVector) fieldVector; - Preconditions.checkArgument( - minorType.equals(Types.MinorType.LIST), - typeMismatchMessage(currentType, minorType)); - addValueToRowForAllRows( - col, - rowIndex -> { - if (listVector.isNull(rowIndex)) { - return null; - } - List listVectorObject = listVector.getObject(rowIndex); - return Arrays.toString(listVectorObject.toArray()); - }); - } else { - VarCharVector varCharVector = (VarCharVector) fieldVector; - Preconditions.checkArgument( - minorType.equals(Types.MinorType.VARCHAR), - typeMismatchMessage(currentType, minorType)); - addValueToRowForAllRows( - col, - rowIndex -> - varCharVector.isNull(rowIndex) - ? null - : new String(varCharVector.get(rowIndex))); - } - break; - case "ARRAY": - ListVector listVector = (ListVector) fieldVector; - Preconditions.checkArgument( - minorType.equals(Types.MinorType.LIST), - typeMismatchMessage(currentType, minorType)); - addValueToRowForArrayColumn(dataType, col, listVector); - break; - case "MAP": - MapVector mapVector = (MapVector) fieldVector; - UnionMapReader reader = mapVector.getReader(); - Preconditions.checkArgument( - minorType.equals(MinorType.MAP), - typeMismatchMessage(currentType, minorType)); - addValueToRowForMapColumn(dataType, col, mapVector, reader); - - break; - case "STRUCT": - StructVector structVector = (StructVector) fieldVector; - Preconditions.checkArgument( - minorType.equals(MinorType.STRUCT), - typeMismatchMessage(currentType, minorType)); - addValueToRowForAllRows( - col, - rowIndex -> { - if (structVector.isNull(rowIndex)) { - return null; - } - Map structValue = structVector.getObject(rowIndex); - return structValue; - }); - break; - default: - String errMsg = "Unsupported type " + fieldTypes[col].getSqlType().name(); - log.error(errMsg); - throw new DorisConnectorException( - DorisConnectorErrorCode.ARROW_READ_FAILED, errMsg); - } - } - - private void addValueToRowForMapColumn( - SeaTunnelDataType dataType, int col, MapVector mapVector, UnionMapReader reader) { - addValueToRowForAllRows( - col, - rowIndex -> { - if (mapVector.isNull(rowIndex)) { - return null; - } - reader.setPosition(rowIndex); - Map mapValue = new HashMap<>(); - MapType mapType = (MapType) dataType; - SqlType keyType = mapType.getKeyType().getSqlType(); - SqlType valueType = mapType.getValueType().getSqlType(); - while (reader.next()) { - mapValue.put( - getDataFromVector(reader.key().readObject(), keyType), - getDataFromVector(reader.value().readObject(), valueType)); - } - return mapValue; - }); - } - - private Object getDataFromVector(Object vectorObject, SqlType sqlType) { - if (vectorObject instanceof Boolean) { - return Boolean.valueOf(vectorObject.toString()); - } - - if (vectorObject instanceof Byte) { - return Byte.valueOf(vectorObject.toString()); - } - - if (vectorObject instanceof Short) { - return Short.valueOf(vectorObject.toString()); - } - - if (vectorObject instanceof Integer) { - if (sqlType.equals(SqlType.DATE)) { - return LocalDate.ofEpochDay((int) vectorObject); - } - return Integer.valueOf(vectorObject.toString()); - } - - if (vectorObject instanceof Long) { - return Long.valueOf(vectorObject.toString()); - } - - if (vectorObject instanceof Float) { - return Float.valueOf(vectorObject.toString()); - } - - if (vectorObject instanceof Double) { - return Double.valueOf(vectorObject.toString()); - } - - if (vectorObject instanceof Text) { - if (sqlType.equals(SqlType.TIMESTAMP)) { - String stringValue = completeMilliseconds(vectorObject.toString()); - return LocalDateTime.parse(stringValue, dateTimeV2Formatter); - } else if (sqlType.equals(SqlType.DATE)) { - return LocalDate.parse(vectorObject.toString(), dateFormatter); - } else if (sqlType.equals(SqlType.DECIMAL)) { - return new BigDecimal(vectorObject.toString()); - } - return vectorObject.toString(); - } - - if (vectorObject instanceof BigDecimal) { - return new BigDecimal(vectorObject.toString()); - } - - if (vectorObject instanceof byte[] && sqlType.equals(SqlType.DECIMAL)) { - byte[] bytes = (byte[]) vectorObject; - int left = 0, right = bytes.length - 1; - while (left < right) { - byte temp = bytes[left]; - bytes[left] = bytes[right]; - bytes[right] = temp; - left++; - right--; - } - return new BigDecimal(new BigInteger(bytes), 0); - } - if (vectorObject instanceof LocalDate) { - return DateUtils.parse(vectorObject.toString()); - } - - if (vectorObject instanceof LocalDateTime) { - return DateTimeUtils.parse(vectorObject.toString()); - } - - return vectorObject.toString(); - } - - private void addValueToRowForArrayColumn( - SeaTunnelDataType dataType, int col, ListVector listVector) { - SqlType eleSqlType = null; - if (dataType instanceof ArrayType) { - ArrayType arrayType = (ArrayType) dataType; - eleSqlType = arrayType.getElementType().getSqlType(); - } - SqlType finalEleSqlType = eleSqlType; - addValueToRowForAllRows( - col, - rowIndex -> { - if (listVector.isNull(rowIndex)) { - return null; - } - List listVectorObject = listVector.getObject(rowIndex); - if (listVectorObject.get(0) instanceof Boolean) { - return listVectorObject.stream() - .map(x -> getDataFromVector(x, finalEleSqlType)) - .toArray(Boolean[]::new); - } - - if (listVectorObject.get(0) instanceof Byte) { - return listVectorObject.stream() - .map(x -> getDataFromVector(x, finalEleSqlType)) - .toArray(Byte[]::new); - } - - if (listVectorObject.get(0) instanceof Short) { - return listVectorObject.stream() - .map(x -> getDataFromVector(x, finalEleSqlType)) - .toArray(Short[]::new); - } - - if (listVectorObject.get(0) instanceof Integer) { - return listVectorObject.stream() - .map(x -> getDataFromVector(x, finalEleSqlType)) - .toArray(Integer[]::new); - } - - if (listVectorObject.get(0) instanceof Long) { - return listVectorObject.stream() - .map(x -> getDataFromVector(x, finalEleSqlType)) - .toArray(Long[]::new); - } - - if (listVectorObject.get(0) instanceof Float) { - return listVectorObject.stream() - .map(x -> getDataFromVector(x, finalEleSqlType)) - .toArray(Float[]::new); - } - - if (listVectorObject.get(0) instanceof Double) { - return listVectorObject.stream() - .map(x -> getDataFromVector(x, finalEleSqlType)) - .toArray(Double[]::new); - } - - if (listVectorObject.get(0) instanceof Text) { - return listVectorObject.stream() - .map(x -> getDataFromVector(x, finalEleSqlType)) - .toArray(String[]::new); - } - - if (listVectorObject.get(0) instanceof BigDecimal) { - return listVectorObject.stream() - .map(x -> getDataFromVector(x, finalEleSqlType)) - .toArray(BigDecimal[]::new); - } - - if (listVectorObject.get(0) instanceof byte[] - && dataType instanceof DecimalArrayType) { - return listVectorObject.stream() - .map(x -> getDataFromVector(x, finalEleSqlType)) - .toArray(BigDecimal[]::new); - } - - return listVectorObject.toArray(); - }); - } - - private void addValueToRowForAllRows(int col, IntFunction function) { - for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { - Object fieldValue = function.apply(rowIndex); - addValueToRow(rowIndex, col, fieldValue); - } - } - - private String completeMilliseconds(String stringValue) { - if (stringValue.length() == DATETIMEV2_PATTERN.length()) { - return stringValue; - } - StringBuilder sb = new StringBuilder(stringValue); - if (stringValue.length() == DATETIME_PATTERN.length()) { - sb.append("."); - } - while (sb.toString().length() < DATETIMEV2_PATTERN.length()) { - sb.append(0); - } - return sb.toString(); - } - - public SeaTunnelRow next() { - if (!hasNext()) { - String errMsg = - "Get row offset:" + offsetInRowBatch + " larger than row size: " + readRowCount; - log.error(errMsg); - throw new NoSuchElementException(errMsg); - } - return seatunnelRowBatch.get(offsetInRowBatch++); - } - - private String typeMismatchMessage(final String flinkType, final Types.MinorType arrowType) { - final String messageTemplate = "FLINK type is %1$s, but arrow type is %2$s."; - return String.format(messageTemplate, flinkType, arrowType.name()); - } - - public int getReadRowCount() { - return readRowCount; - } - - public void close() { - try { - if (arrowStreamReader != null) { - arrowStreamReader.close(); - } - if (rootAllocator != null) { - rootAllocator.close(); - } - } catch (IOException ioe) { - throw new DorisConnectorException( - DorisConnectorErrorCode.ROW_BATCH_GET_FAILED, - "Failed to close ArrowStreamReader", - ioe); - } - } -} diff --git a/seatunnel-connectors-v2/connector-starrocks/pom.xml b/seatunnel-connectors-v2/connector-starrocks/pom.xml index 08e49bc0f02..6653610de4e 100644 --- a/seatunnel-connectors-v2/connector-starrocks/pom.xml +++ b/seatunnel-connectors-v2/connector-starrocks/pom.xml @@ -55,11 +55,6 @@ ${mysql.version} provided - - org.apache.seatunnel - seatunnel-common - ${project.version} - org.apache.httpcomponents httpclient @@ -75,52 +70,6 @@ starrocks-thrift-sdk ${starrocks.thrift.sdk.version} - - org.apache.arrow - arrow-vector - ${arrow.version} - - - org.apache.arrow - arrow-memory-netty - ${arrow.version} - - - - - org.apache.maven.plugins - maven-shade-plugin - - - - org.apache.arrow - ${seatunnel.shade.package}.${connector.name}.org.apache.arrow - - - io.netty - ${seatunnel.shade.package}.${connector.name}.io.netty - - - com.google.flatbuffers - ${seatunnel.shade.package}.${connector.name}.com.google.flatbuffers - - - com.fasterxml.jackson - ${seatunnel.shade.package}.${connector.name}.com.fasterxml.jackson - - - - - - - shade - - package - - - - - diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java index 07a5a03eba7..c0be0106bb0 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.source.arrow.reader.ArrowToSeatunnelRowReader; import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.model.QueryPartition; import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig; import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode; @@ -56,7 +57,7 @@ public class StarRocksBeReadClient implements Serializable { private int readerOffset = 0; private final SourceConfig sourceConfig; private SeaTunnelRowType seaTunnelRowType; - private StarRocksRowBatchReader rowBatch; + private ArrowToSeatunnelRowReader rowBatch; protected AtomicBoolean eos = new AtomicBoolean(false); public StarRocksBeReadClient(String beNodeInfo, SourceConfig sourceConfig) { @@ -162,7 +163,10 @@ public boolean hasNext() { } eos.set(result.isEos()); if (!eos.get()) { - rowBatch = new StarRocksRowBatchReader(result, seaTunnelRowType).readArrow(); + + rowBatch = + new ArrowToSeatunnelRowReader(result.getRows(), seaTunnelRowType) + .readArrow(); } } catch (TException e) { throw new StarRocksConnectorException( diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksRowBatchReader.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksRowBatchReader.java deleted file mode 100644 index 2ea7e98c7e5..00000000000 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksRowBatchReader.java +++ /dev/null @@ -1,357 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.seatunnel.connectors.seatunnel.starrocks.client.source; - -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode; -import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException; - -import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.BigIntVector; -import org.apache.arrow.vector.BitVector; -import org.apache.arrow.vector.DecimalVector; -import org.apache.arrow.vector.FieldVector; -import org.apache.arrow.vector.Float4Vector; -import org.apache.arrow.vector.Float8Vector; -import org.apache.arrow.vector.IntVector; -import org.apache.arrow.vector.SmallIntVector; -import org.apache.arrow.vector.TinyIntVector; -import org.apache.arrow.vector.VarCharVector; -import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.ipc.ArrowStreamReader; -import org.apache.arrow.vector.types.Types; - -import com.starrocks.thrift.TScanBatchResult; -import lombok.extern.slf4j.Slf4j; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.math.BigDecimal; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.List; - -import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument; - -@Slf4j -public class StarRocksRowBatchReader { - - private SeaTunnelDataType[] seaTunnelDataTypes; - private int offsetInRowBatch = 0; - private int rowCountInOneBatch = 0; - private int readRowCount = 0; - private List seaTunnelRowBatch = new ArrayList<>(); - private final ArrowStreamReader arrowStreamReader; - private VectorSchemaRoot root; - private List fieldVectors; - private RootAllocator rootAllocator; - private final DateTimeFormatter dateTimeFormatter = - DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); - - public StarRocksRowBatchReader(TScanBatchResult nextResult, SeaTunnelRowType seaTunnelRowType) { - this.seaTunnelDataTypes = seaTunnelRowType.getFieldTypes(); - this.rootAllocator = new RootAllocator(Integer.MAX_VALUE); - this.arrowStreamReader = - new ArrowStreamReader( - new ByteArrayInputStream(nextResult.getRows()), rootAllocator); - } - - public StarRocksRowBatchReader readArrow() { - try { - this.root = arrowStreamReader.getVectorSchemaRoot(); - while (arrowStreamReader.loadNextBatch()) { - fieldVectors = root.getFieldVectors(); - if (fieldVectors.size() != seaTunnelDataTypes.length) { - log.error( - "seaTunnel schema size '{}' is not equal to arrow field size '{}'.", - fieldVectors.size(), - seaTunnelDataTypes.length); - throw new StarRocksConnectorException( - StarRocksConnectorErrorCode.READER_ARROW_DATA_FAILED, - "schema size of fetch data is wrong."); - } - if (fieldVectors.size() == 0 || root.getRowCount() == 0) { - log.debug("one batch in arrow has no data."); - continue; - } - log.info("one batch in arrow row count size '{}'", root.getRowCount()); - rowCountInOneBatch = root.getRowCount(); - // init the rowBatch - for (int i = 0; i < rowCountInOneBatch; ++i) { - seaTunnelRowBatch.add(new SeaTunnelRow(fieldVectors.size())); - } - - convertArrowToRowBatch(); - readRowCount += root.getRowCount(); - } - return this; - } catch (Exception e) { - throw new StarRocksConnectorException( - StarRocksConnectorErrorCode.READER_ARROW_DATA_FAILED, e); - } finally { - close(); - } - } - - public boolean hasNext() { - if (offsetInRowBatch < readRowCount) { - return true; - } - return false; - } - - private void addValueToRow(int rowIndex, int colIndex, Object obj) { - if (rowIndex > rowCountInOneBatch) { - throw new StarRocksConnectorException( - StarRocksConnectorErrorCode.READER_ARROW_DATA_FAILED, - String.format( - "Get row offset: %d larger than row size: %d", - rowIndex, rowCountInOneBatch)); - } - seaTunnelRowBatch.get(readRowCount + rowIndex).setField(colIndex, obj); - } - - public void convertArrowToRowBatch() { - try { - for (int col = 0; col < fieldVectors.size(); col++) { - SeaTunnelDataType dataType = seaTunnelDataTypes[col]; - final String currentType = dataType.getSqlType().name(); - - FieldVector curFieldVector = fieldVectors.get(col); - Types.MinorType mt = curFieldVector.getMinorType(); - switch (dataType.getSqlType()) { - case BOOLEAN: - checkArgument( - mt.equals(Types.MinorType.BIT), - "seaTunnel type is %1$s, but arrow type is %2$s.", - currentType, - mt.name()); - BitVector bitVector = (BitVector) curFieldVector; - for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { - Object fieldValue = - bitVector.isNull(rowIndex) - ? null - : bitVector.get(rowIndex) != 0; - addValueToRow(rowIndex, col, fieldValue); - } - break; - case TINYINT: - checkArgument( - mt.equals(Types.MinorType.TINYINT), - "seaTunnel type is %1$s, but arrow type is %2$s.", - currentType, - mt.name()); - TinyIntVector tinyIntVector = (TinyIntVector) curFieldVector; - for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { - Object fieldValue = - tinyIntVector.isNull(rowIndex) - ? null - : tinyIntVector.get(rowIndex); - addValueToRow(rowIndex, col, fieldValue); - } - break; - case SMALLINT: - checkArgument( - mt.equals(Types.MinorType.SMALLINT), - "seaTunnel type is %1$s, but arrow type is %2$s.", - currentType, - mt.name()); - SmallIntVector smallIntVector = (SmallIntVector) curFieldVector; - for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { - Object fieldValue = - smallIntVector.isNull(rowIndex) - ? null - : smallIntVector.get(rowIndex); - addValueToRow(rowIndex, col, fieldValue); - } - break; - case INT: - checkArgument( - mt.equals(Types.MinorType.INT), - "seaTunnel type is %1$s, but arrow type is %2$s.", - currentType, - mt.name()); - IntVector intVector = (IntVector) curFieldVector; - for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { - Object fieldValue = - intVector.isNull(rowIndex) ? null : intVector.get(rowIndex); - addValueToRow(rowIndex, col, fieldValue); - } - break; - case BIGINT: - checkArgument( - mt.equals(Types.MinorType.BIGINT), - "seaTunnel type is %1$s, but arrow type is %2$s.", - currentType, - mt.name()); - BigIntVector bigIntVector = (BigIntVector) curFieldVector; - for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { - Object fieldValue = - bigIntVector.isNull(rowIndex) - ? null - : bigIntVector.get(rowIndex); - addValueToRow(rowIndex, col, fieldValue); - } - break; - case FLOAT: - checkArgument( - mt.equals(Types.MinorType.FLOAT4), - "seaTunnel type is %1$s, but arrow type is %2$s.", - currentType, - mt.name()); - Float4Vector float4Vector = (Float4Vector) curFieldVector; - for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { - Object fieldValue = - float4Vector.isNull(rowIndex) - ? null - : float4Vector.get(rowIndex); - addValueToRow(rowIndex, col, fieldValue); - } - break; - case DOUBLE: - checkArgument( - mt.equals(Types.MinorType.FLOAT8), - "seaTunnel type is %1$s, but arrow type is %2$s.", - currentType, - mt.name()); - Float8Vector float8Vector = (Float8Vector) curFieldVector; - for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { - Object fieldValue = - float8Vector.isNull(rowIndex) - ? null - : float8Vector.get(rowIndex); - addValueToRow(rowIndex, col, fieldValue); - } - break; - case DECIMAL: - checkArgument( - mt.equals(Types.MinorType.DECIMAL), - "seaTunnel type is %1$s, but arrow type is %2$s.", - currentType, - mt.name()); - DecimalVector decimalVector = (DecimalVector) curFieldVector; - for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { - if (decimalVector.isNull(rowIndex)) { - addValueToRow(rowIndex, col, null); - continue; - } - BigDecimal value = decimalVector.getObject(rowIndex); - addValueToRow(rowIndex, col, value); - } - break; - case DATE: - checkArgument( - mt.equals(Types.MinorType.VARCHAR), - "seaTunnel type is %1$s, but arrow type is %2$s.", - currentType, - mt.name()); - VarCharVector date = (VarCharVector) curFieldVector; - for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { - if (date.isNull(rowIndex)) { - addValueToRow(rowIndex, col, null); - continue; - } - String value = new String(date.get(rowIndex)); - LocalDate localDate = LocalDate.parse(value, dateFormatter); - addValueToRow(rowIndex, col, localDate); - } - break; - case TIMESTAMP: - checkArgument( - mt.equals(Types.MinorType.VARCHAR), - "seaTunnel type is %1$s, but arrow type is %2$s.", - currentType, - mt.name()); - VarCharVector timeStampSecVector = (VarCharVector) curFieldVector; - for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { - if (timeStampSecVector.isNull(rowIndex)) { - addValueToRow(rowIndex, col, null); - continue; - } - String value = new String(timeStampSecVector.get(rowIndex)); - LocalDateTime parse = LocalDateTime.parse(value, dateTimeFormatter); - addValueToRow(rowIndex, col, parse); - } - break; - case STRING: - checkArgument( - mt.equals(Types.MinorType.VARCHAR), - "seaTunnel type is %1$s, but arrow type is %2$s.", - currentType, - mt.name()); - VarCharVector varCharVector = (VarCharVector) curFieldVector; - for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { - if (varCharVector.isNull(rowIndex)) { - addValueToRow(rowIndex, col, null); - continue; - } - String value = new String(varCharVector.get(rowIndex)); - addValueToRow(rowIndex, col, value); - } - break; - default: - throw new StarRocksConnectorException( - StarRocksConnectorErrorCode.READER_ARROW_DATA_FAILED, - String.format( - "Unsupported type %s", - seaTunnelDataTypes[col].getSqlType().name())); - } - } - } catch (Exception e) { - close(); - throw new StarRocksConnectorException( - StarRocksConnectorErrorCode.READER_ARROW_DATA_FAILED, e); - } - } - - public SeaTunnelRow next() { - if (!hasNext()) { - throw new StarRocksConnectorException( - StarRocksConnectorErrorCode.READER_ARROW_DATA_FAILED, - String.format( - "Get row offset: %d larger than row size: %d", - offsetInRowBatch, readRowCount)); - } - return seaTunnelRowBatch.get(offsetInRowBatch++); - } - - public int getReadRowCount() { - return readRowCount; - } - - public void close() { - try { - if (arrowStreamReader != null) { - arrowStreamReader.close(); - } - if (rootAllocator != null) { - rootAllocator.close(); - } - } catch (IOException e) { - throw new StarRocksConnectorException( - StarRocksConnectorErrorCode.READER_ARROW_DATA_FAILED, - "Failed to close ArrowStreamReader", - e); - } - } -} diff --git a/seatunnel-shade/pom.xml b/seatunnel-shade/pom.xml index 2b378739de7..0011f6b37d2 100644 --- a/seatunnel-shade/pom.xml +++ b/seatunnel-shade/pom.xml @@ -30,13 +30,12 @@ seatunnel-hadoop3-3.1.4-uber seatunnel-jackson seatunnel-guava - seatunnel-arrow-5.0 seatunnel-thrift-service seatunnel-hazelcast seatunnel-janino seatunnel-jetty9-9.4.56 seatunnel-hadoop-aws - + seatunnel-arrow diff --git a/seatunnel-shade/seatunnel-arrow-5.0/pom.xml b/seatunnel-shade/seatunnel-arrow/pom.xml similarity index 78% rename from seatunnel-shade/seatunnel-arrow-5.0/pom.xml rename to seatunnel-shade/seatunnel-arrow/pom.xml index f0d1f2b32b5..27b49ef731c 100644 --- a/seatunnel-shade/seatunnel-arrow-5.0/pom.xml +++ b/seatunnel-shade/seatunnel-arrow/pom.xml @@ -22,11 +22,11 @@ ${revision} - seatunnel-arrow-5.0 - SeaTunnel : Shade : Arrow5.0 + seatunnel-arrow + SeaTunnel : Shade : Arrow - 5.0.0 + 15.0.1 @@ -39,36 +39,12 @@ org.apache.arrow arrow-memory-netty ${arrow.version} - - - com.fasterxml.jackson.core - jackson-annotations - - - com.fasterxml.jackson.core - jackson-core - - - com.fasterxml.jackson.core - jackson-databind - - - io.netty - netty-common - - - - - io.netty - netty-common - 4.1.77.Final - org.apache.maven.plugins maven-shade-plugin @@ -77,8 +53,8 @@ package - seatunnel-arrow-5.0 - ${enableSourceJarCreation} + seatunnel-arrow + true true false false @@ -97,13 +73,17 @@ org.apache.arrow ${seatunnel.shade.package}.org.apache.arrow + + io.netty + ${seatunnel.shade.package}.io.netty + com.google.flatbuffers ${seatunnel.shade.package}.com.google.flatbuffers - io.netty - ${seatunnel.shade.package}.io.netty + com.fasterxml.jackson + ${seatunnel.shade.package}.com.fasterxml.jackson @@ -123,7 +103,7 @@ - ${basedir}/target/seatunnel-arrow-5.0.jar + ${basedir}/target/seatunnel-arrow.jar jar optional @@ -134,5 +114,4 @@ - diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index b2012936bd2..62924093d1b 100755 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -70,3 +70,13 @@ jetty-util-9.4.56.v20240826.jar jetty-util-ajax-9.4.56.v20240826.jar javax.servlet-api-3.1.0.jar seatunnel-jetty9-9.4.56-2.3.9-SNAPSHOT-optional.jar +arrow-format-15.0.1.jar +arrow-memory-core-15.0.1.jar +arrow-memory-netty-15.0.1.jar +arrow-vector-15.0.1.jar +eclipse-collections-11.1.0.jar +eclipse-collections-api-11.1.0.jar +flatbuffers-java-23.5.26.jar +netty-buffer-4.1.104.Final.jar +netty-common-4.1.104.Final.jar +seatunnel-arrow-2.3.9-SNAPSHOT-optional.jar