diff --git a/pom.xml b/pom.xml
index 8325bcf9e93bc..bfbd0f63d963e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -192,6 +192,7 @@
redis-hbo-provider
presto-singlestore
presto-hana
+ presto-base-arrow-flight
diff --git a/presto-base-arrow-flight/pom.xml b/presto-base-arrow-flight/pom.xml
new file mode 100644
index 0000000000000..daa797a2d2fbb
--- /dev/null
+++ b/presto-base-arrow-flight/pom.xml
@@ -0,0 +1,409 @@
+
+
+ 4.0.0
+
+ com.facebook.presto
+ presto-root
+ 0.289-SNAPSHOT
+
+ presto-base-arrow-flight
+ presto-base-arrow-flight
+ arrow-flight Connector Plugin for Presto
+
+
+ ${project.parent.basedir}
+ 1.63.0
+ 4.10.0
+ 17.0.0
+ 4.1.100.Final
+
+
+
+
+
+ com.facebook.airlift
+ bootstrap
+
+
+ ch.qos.logback
+ logback-core
+
+
+
+
+
+ com.facebook.airlift
+ log
+
+
+
+ com.google.guava
+ guava
+
+
+ org.checkerframework
+ checker-qual
+
+
+ com.google.errorprone
+ error_prone_annotations
+
+
+ com.google.j2objc
+ j2objc-annotations
+
+
+
+
+
+ javax.inject
+ javax.inject
+
+
+
+ com.facebook.presto
+ presto-spi
+ provided
+
+
+
+ io.airlift
+ slice
+ provided
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+
+ com.facebook.presto
+ presto-common
+ provided
+
+
+
+ io.airlift
+ units
+ provided
+
+
+
+ com.google.code.findbugs
+ jsr305
+ true
+
+
+
+ org.apache.arrow
+ arrow-memory-core
+ ${arrow.version}
+
+
+
+ com.google.inject
+ guice
+
+
+
+ com.facebook.airlift
+ configuration
+
+
+
+ io.netty
+ netty-codec-http2
+ ${netty.version}
+
+
+
+ io.netty
+ netty-handler-proxy
+ ${netty.version}
+
+
+ io.netty
+ netty-codec-http
+
+
+
+
+
+ io.netty
+ netty-tcnative-boringssl-static
+ 2.0.65.Final
+
+
+
+ org.apache.arrow
+ flight-core
+ ${arrow.version}
+
+
+ io.netty
+ netty-codec-http2
+
+
+
+ io.netty
+ netty-handler-proxy
+
+
+
+ io.netty
+ netty-common
+
+
+
+ io.netty
+ netty-buffer
+
+
+
+ io.netty
+ netty-handler
+
+
+
+ io.netty
+ netty-codec
+
+
+
+ io.netty
+ netty-transport
+
+
+
+ com.google.j2objc
+ j2objc-annotations
+
+
+
+ io.netty
+ netty-transport-native-unix-common
+
+
+
+
+
+
+ org.apache.arrow
+ arrow-vector
+ ${arrow.version}
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+
+
+
+
+
+
+ org.testng
+ testng
+ test
+
+
+
+ com.facebook.airlift
+ json
+ test
+
+
+
+ org.mockito
+ mockito-core
+ 5.11.0
+
+
+ org.objenesis
+ objenesis
+
+
+ test
+
+
+
+ com.facebook.presto
+ presto-testng-services
+ test
+
+
+
+ com.facebook.airlift
+ testing
+ test
+ ${dep.airlift.version}
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ provided
+
+
+
+
+
+
+ org.codehaus.mojo
+ animal-sniffer-annotations
+ 1.23
+
+
+
+ com.google.j2objc
+ j2objc-annotations
+ 1.3
+
+
+
+ com.google.errorprone
+ error_prone_annotations
+ 2.14.0
+
+
+
+ com.google.protobuf
+ protobuf-java
+ 3.25.1
+
+
+
+ io.grpc
+ grpc-protobuf
+ 1.63.0
+
+
+
+
+ io.grpc
+ grpc-stub
+ 1.63.0
+
+
+
+ io.grpc
+ grpc-core
+ 1.63.0
+
+
+
+ io.grpc
+ grpc-api
+ 1.63.0
+
+
+
+ io.grpc
+ grpc-context
+ 1.63.0
+
+
+
+ commons-codec
+ commons-codec
+ 1.17.0
+
+
+
+ io.netty
+ netty-transport-native-unix-common
+ 4.1.100.Final
+
+
+
+ io.netty
+ netty-common
+ 4.1.100.Final
+
+
+
+ io.netty
+ netty-buffer
+ 4.1.100.Final
+
+
+
+
+ io.netty
+ netty-handler
+ 4.1.100.Final
+
+
+
+ io.netty
+ netty-transport
+ 4.1.100.Final
+
+
+
+ io.netty
+ netty-codec
+ 4.1.100.Final
+
+
+
+ org.slf4j
+ slf4j-api
+ 2.0.13
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+
+
+
+
+ com.google.errorprone:error_prone_annotations
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ io.netty:netty-codec-http2
+ io.netty:netty-handler-proxy
+ io.netty:netty-tcnative-boringssl-static
+
+
+
+
+ org.basepom.maven
+ duplicate-finder-maven-plugin
+ 1.2.1
+
+
+ module-info
+ META-INF.versions.9.module-info
+
+
+ arrow-git.properties
+
+
+
+
+
+ check
+
+
+
+
+
+
+
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractFlightRequest.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractFlightRequest.java
new file mode 100644
index 0000000000000..1336676b1582f
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractFlightRequest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import java.util.Optional;
+
+public abstract class ArrowAbstractFlightRequest
+ implements ArrowFlightRequest
+{
+ private final String schema;
+ private final String table;
+ private final Optional query;
+
+ public ArrowAbstractFlightRequest(String schema)
+ {
+ this.schema = schema;
+ this.query = Optional.empty();
+ this.table = null;
+ }
+
+ public ArrowAbstractFlightRequest(String schema, String table)
+ {
+ this.schema = schema;
+ this.table = table;
+ query = Optional.empty();
+ }
+
+ public ArrowAbstractFlightRequest(String schema, String table, Optional query)
+ {
+ this.schema = schema;
+ this.table = table;
+ this.query = query;
+ }
+
+ public String getSchema()
+ {
+ return schema;
+ }
+
+ public String getTable()
+ {
+ return table;
+ }
+
+ public Optional getQuery()
+ {
+ return query;
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractMetadata.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractMetadata.java
new file mode 100644
index 0000000000000..2769d057a13b3
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractMetadata.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.common.type.BigintType;
+import com.facebook.presto.common.type.BooleanType;
+import com.facebook.presto.common.type.DateType;
+import com.facebook.presto.common.type.DecimalType;
+import com.facebook.presto.common.type.DoubleType;
+import com.facebook.presto.common.type.IntegerType;
+import com.facebook.presto.common.type.RealType;
+import com.facebook.presto.common.type.SmallintType;
+import com.facebook.presto.common.type.TimeType;
+import com.facebook.presto.common.type.TimestampType;
+import com.facebook.presto.common.type.TinyintType;
+import com.facebook.presto.common.type.VarbinaryType;
+import com.facebook.presto.common.type.VarcharType;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayout;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutResult;
+import com.facebook.presto.spi.ConnectorTableMetadata;
+import com.facebook.presto.spi.Constraint;
+import com.facebook.presto.spi.NotFoundException;
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.SchemaTablePrefix;
+import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static com.facebook.plugin.arrow.ArrowErrorCode.ARROW_FLIGHT_ERROR;
+import static java.util.Objects.requireNonNull;
+
+public abstract class ArrowAbstractMetadata
+ implements ConnectorMetadata
+{
+ private static final Logger logger = Logger.get(ArrowAbstractMetadata.class);
+ private final ArrowFlightConfig config;
+ private final ArrowFlightClientHandler clientHandler;
+
+ public ArrowAbstractMetadata(ArrowFlightConfig config, ArrowFlightClientHandler clientHandler)
+ {
+ this.config = config;
+ this.clientHandler = requireNonNull(clientHandler);
+ }
+
+ @Override
+ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
+ {
+ if (!listSchemaNames(session).contains(tableName.getSchemaName())) {
+ return null;
+ }
+
+ if (!listTables(session, Optional.ofNullable(tableName.getSchemaName())).contains(tableName)) {
+ return null;
+ }
+ return new ArrowTableHandle(tableName.getSchemaName(), tableName.getTableName());
+ }
+
+ public List getColumnsList(String schema, String table, ConnectorSession connectorSession)
+ {
+ try {
+ String dataSourceSpecificSchemaName = getDataSourceSpecificSchemaName(config, schema);
+ String dataSourceSpecificTableName = getDataSourceSpecificTableName(config, table);
+ ArrowFlightRequest request = getArrowFlightRequest(clientHandler.getConfig(), Optional.empty(),
+ dataSourceSpecificSchemaName, dataSourceSpecificTableName);
+
+ FlightInfo flightInfo = clientHandler.getFlightInfo(request, connectorSession);
+ List fields = flightInfo.getSchema().getFields();
+ return fields;
+ }
+ catch (Exception e) {
+ throw new ArrowException(ARROW_FLIGHT_ERROR, "The table columns could not be listed for the table " + table, e);
+ }
+ }
+
+ @Override
+ public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ Map column = new HashMap<>();
+
+ String schemaValue = ((ArrowTableHandle) tableHandle).getSchema();
+ String tableValue = ((ArrowTableHandle) tableHandle).getTable();
+ String dataSourceSpecificSchemaValue = getDataSourceSpecificSchemaName(config, schemaValue);
+ String dataSourceSpecificTableName = getDataSourceSpecificTableName(config, tableValue);
+ List columnList = getColumnsList(dataSourceSpecificSchemaValue, dataSourceSpecificTableName, session);
+
+ for (Field field : columnList) {
+ String columnName = field.getName();
+ logger.debug("The value of the flight columnName is:- %s", columnName);
+ switch (field.getType().getTypeID()) {
+ case Int:
+ ArrowType.Int intType = (ArrowType.Int) field.getType();
+ switch (intType.getBitWidth()) {
+ case 64:
+ column.put(columnName, new ArrowColumnHandle(columnName, BigintType.BIGINT));
+ break;
+ case 32:
+ column.put(columnName, new ArrowColumnHandle(columnName, IntegerType.INTEGER));
+ break;
+ case 16:
+ column.put(columnName, new ArrowColumnHandle(columnName, SmallintType.SMALLINT));
+ break;
+ case 8:
+ column.put(columnName, new ArrowColumnHandle(columnName, TinyintType.TINYINT));
+ break;
+ default:
+ throw new ArrowException(ARROW_FLIGHT_ERROR, "Invalid bit width " + intType.getBitWidth());
+ }
+ break;
+ case Binary:
+ case LargeBinary:
+ case FixedSizeBinary:
+ column.put(columnName, new ArrowColumnHandle(columnName, VarbinaryType.VARBINARY));
+ break;
+ case Date:
+ column.put(columnName, new ArrowColumnHandle(columnName, DateType.DATE));
+ break;
+ case Timestamp:
+ column.put(columnName, new ArrowColumnHandle(columnName, TimestampType.TIMESTAMP));
+ break;
+ case Utf8:
+ case LargeUtf8:
+ column.put(columnName, new ArrowColumnHandle(columnName, VarcharType.VARCHAR));
+ break;
+ case FloatingPoint:
+ ArrowType.FloatingPoint floatingPoint = (ArrowType.FloatingPoint) field.getType();
+ switch (floatingPoint.getPrecision()) {
+ case SINGLE:
+ column.put(columnName, new ArrowColumnHandle(columnName, RealType.REAL));
+ break;
+ case DOUBLE:
+ column.put(columnName, new ArrowColumnHandle(columnName, DoubleType.DOUBLE));
+ break;
+ default:
+ throw new ArrowException(ARROW_FLIGHT_ERROR, "Invalid floating point precision " + floatingPoint.getPrecision());
+ }
+ break;
+ case Decimal:
+ ArrowType.Decimal decimalType = (ArrowType.Decimal) field.getType();
+ int precision = decimalType.getPrecision();
+ int scale = decimalType.getScale();
+ column.put(columnName, new ArrowColumnHandle(columnName, DecimalType.createDecimalType(precision, scale)));
+ break;
+ case Bool:
+ column.put(columnName, new ArrowColumnHandle(columnName, BooleanType.BOOLEAN));
+ break;
+ case Time:
+ column.put(columnName, new ArrowColumnHandle(columnName, TimeType.TIME));
+ break;
+ default:
+ throw new UnsupportedOperationException("The data type " + field.getType().getTypeID() + " is not supported.");
+ }
+ }
+ return column;
+ }
+
+ @Override
+ public List getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint constraint, Optional> desiredColumns)
+ {
+ ArrowTableHandle tableHandle = (ArrowTableHandle) table;
+
+ List columns = new ArrayList<>();
+ if (desiredColumns.isPresent()) {
+ List arrowColumns = new ArrayList<>(desiredColumns.get());
+ columns = (List) (List>) arrowColumns;
+ }
+
+ ConnectorTableLayout layout = new ConnectorTableLayout(new ArrowTableLayoutHandle(tableHandle, columns, constraint.getSummary(), Optional.empty()));
+ return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
+ }
+
+ @Override
+ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle)
+ {
+ return new ConnectorTableLayout(handle);
+ }
+
+ @Override
+ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
+ {
+ List meta = new ArrayList<>();
+ List columnList = getColumnsList(((ArrowTableHandle) table).getSchema(), ((ArrowTableHandle) table).getTable(), session);
+
+ for (Field field : columnList) {
+ String columnName = field.getName();
+ switch (field.getType().getTypeID()) {
+ case Int:
+ ArrowType.Int intType = (ArrowType.Int) field.getType();
+ switch (intType.getBitWidth()) {
+ case 64:
+ meta.add(new ColumnMetadata(columnName, BigintType.BIGINT));
+ break;
+ case 32:
+ meta.add(new ColumnMetadata(columnName, IntegerType.INTEGER));
+ break;
+ case 16:
+ meta.add(new ColumnMetadata(columnName, SmallintType.SMALLINT));
+ break;
+ case 8:
+ meta.add(new ColumnMetadata(columnName, TinyintType.TINYINT));
+ break;
+ default:
+ throw new ArrowException(ARROW_FLIGHT_ERROR, "Invalid bit width " + intType.getBitWidth());
+ }
+ break;
+ case Binary:
+ case LargeBinary:
+ case FixedSizeBinary:
+ meta.add(new ColumnMetadata(columnName, VarbinaryType.VARBINARY));
+ break;
+ case Date:
+ meta.add(new ColumnMetadata(columnName, DateType.DATE));
+ break;
+ case Timestamp:
+ meta.add(new ColumnMetadata(columnName, TimestampType.TIMESTAMP));
+ break;
+ case Utf8:
+ case LargeUtf8:
+ meta.add(new ColumnMetadata(columnName, VarcharType.VARCHAR));
+ break;
+ case FloatingPoint:
+ ArrowType.FloatingPoint floatingPoint = (ArrowType.FloatingPoint) field.getType();
+ switch (floatingPoint.getPrecision()) {
+ case SINGLE:
+ meta.add(new ColumnMetadata(columnName, RealType.REAL));
+ break;
+ case DOUBLE:
+ meta.add(new ColumnMetadata(columnName, DoubleType.DOUBLE));
+ break;
+ default:
+ throw new ArrowException(ARROW_FLIGHT_ERROR, "Invalid floating point precision " + floatingPoint.getPrecision());
+ }
+ break;
+ case Decimal:
+ ArrowType.Decimal decimalType = (ArrowType.Decimal) field.getType();
+ int precision = decimalType.getPrecision();
+ int scale = decimalType.getScale();
+ meta.add(new ColumnMetadata(columnName, DecimalType.createDecimalType(precision, scale)));
+ break;
+ case Time:
+ meta.add(new ColumnMetadata(columnName, TimeType.TIME));
+ break;
+ case Bool:
+ meta.add(new ColumnMetadata(columnName, BooleanType.BOOLEAN));
+ break;
+ default:
+ throw new UnsupportedOperationException("The data type " + field.getType().getTypeID() + " is not supported.");
+ }
+ }
+ return new ConnectorTableMetadata(new SchemaTableName(((ArrowTableHandle) table).getSchema(), ((ArrowTableHandle) table).getTable()), meta);
+ }
+
+ @Override
+ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
+ {
+ return ((ArrowColumnHandle) columnHandle).getColumnMetadata();
+ }
+
+ @Override
+ public Map> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
+ {
+ requireNonNull(prefix, "prefix is null");
+ ImmutableMap.Builder> columns = ImmutableMap.builder();
+ List tables;
+ if (prefix.getSchemaName() != null && prefix.getTableName() != null) {
+ tables = ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName()));
+ }
+ else {
+ tables = listTables(session, Optional.of(prefix.getSchemaName()));
+ }
+
+ for (SchemaTableName tableName : tables) {
+ try {
+ ConnectorTableHandle tableHandle = getTableHandle(session, tableName);
+ columns.put(tableName, getTableMetadata(session, tableHandle).getColumns());
+ }
+ catch (ClassCastException | NotFoundException e) {
+ throw new ArrowException(ARROW_FLIGHT_ERROR, "The table columns could not be listed for the table " + tableName, e);
+ }
+ catch (Exception e) {
+ throw new ArrowException(ARROW_FLIGHT_ERROR, e.getMessage(), e);
+ }
+ }
+ return columns.build();
+ }
+
+ protected abstract ArrowFlightRequest getArrowFlightRequest(ArrowFlightConfig config, Optional query, String schema, String table);
+
+ protected abstract ArrowFlightRequest getArrowFlightRequest(ArrowFlightConfig config, String schema);
+
+ protected abstract String getDataSourceSpecificSchemaName(ArrowFlightConfig config, String schemaName);
+
+ protected abstract String getDataSourceSpecificTableName(ArrowFlightConfig config, String tableName);
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractSplitManager.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractSplitManager.java
new file mode 100644
index 0000000000000..ddd63a6ba5006
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractSplitManager.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplitSource;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.FixedSplitSource;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import org.apache.arrow.flight.FlightInfo;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public abstract class ArrowAbstractSplitManager
+ implements ConnectorSplitManager
+{
+ private static final Logger logger = Logger.get(ArrowAbstractSplitManager.class);
+ private final ArrowFlightClientHandler clientHandler;
+
+ public ArrowAbstractSplitManager(ArrowFlightClientHandler client)
+ {
+ this.clientHandler = client;
+ }
+
+ @Override
+ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout, SplitSchedulingContext splitSchedulingContext)
+ {
+ ArrowTableLayoutHandle tableLayoutHandle = (ArrowTableLayoutHandle) layout;
+ ArrowTableHandle tableHandle = tableLayoutHandle.getTableHandle();
+ ArrowFlightRequest request = getArrowFlightRequest(clientHandler.getConfig(),
+ tableLayoutHandle);
+
+ FlightInfo flightInfo = clientHandler.getFlightInfo(request, session);
+ List splits = flightInfo.getEndpoints()
+ .stream()
+ .map(info -> new ArrowSplit(
+ tableHandle.getSchema(),
+ tableHandle.getTable(),
+ info.getTicket().getBytes(),
+ info.getLocations().stream().map(location -> location.getUri().toString()).collect(Collectors.toList())))
+ .collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));
+ logger.info("created %d splits from arrow tickets", splits.size());
+ return new FixedSplitSource(splits);
+ }
+
+ protected abstract ArrowFlightRequest getArrowFlightRequest(ArrowFlightConfig config, ArrowTableLayoutHandle tableLayoutHandle);
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowColumnHandle.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowColumnHandle.java
new file mode 100644
index 0000000000000..e935d994dccb0
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowColumnHandle.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.common.type.Type;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import static java.util.Objects.requireNonNull;
+
+public class ArrowColumnHandle
+ implements ColumnHandle
+{
+ private final String columnName;
+ private final Type columnType;
+
+ @JsonCreator
+ public ArrowColumnHandle(
+ @JsonProperty("columnName") String columnName,
+ @JsonProperty("columnType") Type columnType)
+ {
+ this.columnName = requireNonNull(columnName, "columnName is null");
+ this.columnType = requireNonNull(columnType, "type is null");
+ }
+
+ @JsonProperty
+ public String getColumnName()
+ {
+ return columnName;
+ }
+
+ @JsonProperty
+ public Type getColumnType()
+ {
+ return columnType;
+ }
+
+ public ColumnMetadata getColumnMetadata()
+ {
+ return new ColumnMetadata(columnName, columnType);
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnector.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnector.java
new file mode 100644
index 0000000000000..9523bd210faa5
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnector.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.connector.Connector;
+import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.transaction.IsolationLevel;
+import com.google.inject.Inject;
+
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+public class ArrowConnector
+ implements Connector
+{
+ private final ConnectorMetadata metadata;
+ private final ConnectorSplitManager splitManager;
+ private final ConnectorPageSourceProvider pageSourceProvider;
+ private final ConnectorHandleResolver handleResolver;
+
+ @Inject
+ public ArrowConnector(ConnectorMetadata metadata,
+ ConnectorHandleResolver handleResolver,
+ ConnectorSplitManager splitManager,
+ ConnectorPageSourceProvider pageSourceProvider)
+ {
+ this.metadata = requireNonNull(metadata, "Metadata is null");
+ this.handleResolver = requireNonNull(handleResolver, "Metadata is null");
+ this.splitManager = requireNonNull(splitManager, "SplitManager is null");
+ this.pageSourceProvider = requireNonNull(pageSourceProvider, "PageSinkProvider is null");
+ }
+
+ public Optional getHandleResolver()
+ {
+ return Optional.of(handleResolver);
+ }
+
+ @Override
+ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly)
+ {
+ return ArrowTransactionHandle.INSTANCE;
+ }
+
+ @Override
+ public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle)
+ {
+ return metadata;
+ }
+
+ @Override
+ public ConnectorSplitManager getSplitManager()
+ {
+ return splitManager;
+ }
+
+ @Override
+ public ConnectorPageSourceProvider getPageSourceProvider()
+ {
+ return pageSourceProvider;
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnectorFactory.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnectorFactory.java
new file mode 100644
index 0000000000000..f17317dd42b3a
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnectorFactory.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.airlift.bootstrap.Bootstrap;
+import com.facebook.presto.common.type.TypeManager;
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.NodeManager;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.facebook.presto.spi.connector.Connector;
+import com.facebook.presto.spi.connector.ConnectorContext;
+import com.facebook.presto.spi.connector.ConnectorFactory;
+import com.facebook.presto.spi.function.FunctionMetadataManager;
+import com.facebook.presto.spi.function.StandardFunctionResolution;
+import com.facebook.presto.spi.relation.RowExpressionService;
+import com.google.inject.ConfigurationException;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+
+import java.util.Map;
+
+import static com.facebook.plugin.arrow.ArrowErrorCode.ARROW_FLIGHT_ERROR;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static com.google.common.base.Throwables.throwIfUnchecked;
+import static java.util.Objects.requireNonNull;
+
+public class ArrowConnectorFactory
+ implements ConnectorFactory
+{
+ private final String name;
+ private final Module module;
+ private final ClassLoader classLoader;
+
+ public ArrowConnectorFactory(String name, Module module, ClassLoader classLoader)
+ {
+ checkArgument(!isNullOrEmpty(name), "name is null or empty");
+ this.name = name;
+ this.module = requireNonNull(module, "module is null");
+ this.classLoader = requireNonNull(classLoader, "classLoader is null");
+ }
+
+ @Override
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ public ConnectorHandleResolver getHandleResolver()
+ {
+ return new ArrowHandleResolver();
+ }
+
+ @Override
+ public Connector create(String catalogName, Map requiredConfig, ConnectorContext context)
+ {
+ requireNonNull(requiredConfig, "requiredConfig is null");
+
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+ Bootstrap app = new Bootstrap(
+ binder -> {
+ binder.bind(TypeManager.class).toInstance(context.getTypeManager());
+ binder.bind(FunctionMetadataManager.class).toInstance(context.getFunctionMetadataManager());
+ binder.bind(StandardFunctionResolution.class).toInstance(context.getStandardFunctionResolution());
+ binder.bind(RowExpressionService.class).toInstance(context.getRowExpressionService());
+ binder.bind(NodeManager.class).toInstance(context.getNodeManager());
+ },
+ new ArrowModule(catalogName),
+ module);
+
+ Injector injector = app
+ .doNotInitializeLogging()
+ .setRequiredConfigurationProperties(requiredConfig)
+ .initialize();
+
+ return injector.getInstance(ArrowConnector.class);
+ }
+ catch (ConfigurationException ex) {
+ throw new ArrowException(ARROW_FLIGHT_ERROR, "The connector instance could not be created.", ex);
+ }
+ catch (Exception e) {
+ throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnectorId.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnectorId.java
new file mode 100644
index 0000000000000..dce08bac4ac24
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnectorId.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+public class ArrowConnectorId
+{
+ private final String id;
+
+ public ArrowConnectorId(String id)
+ {
+ this.id = requireNonNull(id, "id is null");
+ }
+
+ @Override
+ public String toString()
+ {
+ return id;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(id);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
+ }
+ ArrowConnectorId other = (ArrowConnectorId) obj;
+ return Objects.equals(this.id, other.id);
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowErrorCode.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowErrorCode.java
new file mode 100644
index 0000000000000..2e33f736a62c5
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowErrorCode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.common.ErrorCode;
+import com.facebook.presto.common.ErrorType;
+import com.facebook.presto.spi.ErrorCodeSupplier;
+
+import static com.facebook.presto.common.ErrorType.EXTERNAL;
+import static com.facebook.presto.common.ErrorType.INTERNAL_ERROR;
+
+public enum ArrowErrorCode
+ implements ErrorCodeSupplier
+{
+ ARROW_INVALID_TABLE(0, EXTERNAL),
+ ARROW_INVALID_CREDENTAILS(1, EXTERNAL),
+ ARROW_FLIGHT_ERROR(2, EXTERNAL),
+ ARROW_INTERNAL_ERROR(3, INTERNAL_ERROR);
+
+ private final ErrorCode errorCode;
+
+ ArrowErrorCode(int code, ErrorType type)
+ {
+ errorCode = new ErrorCode(code + 0x0509_0000, name(), type);
+ }
+
+ @Override
+ public ErrorCode toErrorCode()
+ {
+ return errorCode;
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowException.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowException.java
new file mode 100644
index 0000000000000..ba2c6edba589c
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.spi.ErrorCodeSupplier;
+import com.facebook.presto.spi.PrestoException;
+
+public class ArrowException
+ extends PrestoException
+{
+ public ArrowException(ErrorCodeSupplier errorCode, String message)
+ {
+ super(errorCode, message);
+ }
+
+ public ArrowException(ErrorCodeSupplier errorCode, String message, Throwable cause)
+ {
+ super(errorCode, message, cause);
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowExpression.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowExpression.java
new file mode 100644
index 0000000000000..b5c30bfcaf4a0
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowExpression.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.spi.relation.ConstantExpression;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+public class ArrowExpression
+{
+ private final String expression;
+ private final List boundConstantValues;
+
+ public ArrowExpression(String expression)
+ {
+ this(expression, ImmutableList.of());
+ }
+
+ @JsonCreator
+ public ArrowExpression(
+ @JsonProperty("translatedString") String expression,
+ @JsonProperty("boundConstantValues") List constantBindValues)
+ {
+ this.expression = requireNonNull(expression, "expression is null");
+ this.boundConstantValues = requireNonNull(constantBindValues, "boundConstantValues is null");
+ }
+
+ @JsonProperty
+ public String getExpression()
+ {
+ return expression;
+ }
+
+ /**
+ * Constant expressions are not added to the expression String. Instead they appear as "?" in the query.
+ * This is because we would potentially lose precision on double values. Hence when we make a PreparedStatement
+ * out of the SQL string replacing every "?" by it's corresponding actual bindValue.
+ *
+ * @return List of constants to replace in the SQL string.
+ */
+ @JsonProperty
+ public List getBoundConstantValues()
+ {
+ return boundConstantValues;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ArrowExpression that = (ArrowExpression) o;
+ return expression.equals(that.expression) &&
+ boundConstantValues.equals(that.boundConstantValues);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(expression, boundConstantValues);
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClient.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClient.java
new file mode 100644
index 0000000000000..e6a6b2ec919ff
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClient.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.memory.RootAllocator;
+
+import java.io.InputStream;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+public class ArrowFlightClient
+{
+ private final FlightClient flightClient;
+ private final Optional trustedCertificate;
+ private RootAllocator allocator;
+
+ public ArrowFlightClient(FlightClient flightClient, Optional trustedCertificate, RootAllocator allocator)
+ {
+ this.flightClient = requireNonNull(flightClient, "flightClient cannot be null");
+ this.trustedCertificate = trustedCertificate;
+ this.allocator = allocator;
+ }
+
+ public FlightClient getFlightClient()
+ {
+ return flightClient;
+ }
+
+ public Optional getTrustedCertificate()
+ {
+ return trustedCertificate;
+ }
+
+ public void close() throws Exception
+ {
+ flightClient.close();
+ if (trustedCertificate.isPresent()) {
+ trustedCertificate.get().close();
+ }
+ if (allocator != null) {
+ allocator.close();
+ allocator = null;
+ }
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClientHandler.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClientHandler.java
new file mode 100644
index 0000000000000..86671fd6a7090
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClientHandler.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.spi.ConnectorSession;
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightDescriptor;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.flight.grpc.CredentialCallOption;
+import org.apache.arrow.memory.RootAllocator;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.facebook.plugin.arrow.ArrowErrorCode.ARROW_FLIGHT_ERROR;
+
+public abstract class ArrowFlightClientHandler
+{
+ private static final Logger logger = Logger.get(ArrowFlightClientHandler.class);
+ private static final int TIMER_DURATION_IN_MINUTES = 30;
+ private final ArrowFlightConfig config;
+ private AtomicBoolean isClientClosed = new AtomicBoolean(true);
+ private Optional trustedCertificate = Optional.empty();
+ private ScheduledExecutorService scheduledExecutorService;
+ private ArrowFlightClient arrowFlightClient;
+ private RootAllocator allocator;
+
+ public ArrowFlightClientHandler(ArrowFlightConfig config)
+ {
+ this.config = config;
+ }
+
+ public ArrowFlightConfig getConfig()
+ {
+ return config;
+ }
+
+ public synchronized ArrowFlightClient getClient(Optional uri)
+ {
+ if (isClientClosed.get()) {
+ logger.info("Reinitialize the client if closed or not initialized");
+ initializeClient(uri);
+ scheduleCloseTask();
+ }
+ else {
+ resetTimer(); // Reset timer when client is reused
+ }
+ return this.arrowFlightClient;
+ }
+
+ public FlightInfo getFlightInfo(ArrowFlightRequest request, ConnectorSession connectorSession)
+ {
+ try {
+ ArrowFlightClient client = getClient(Optional.empty());
+ CredentialCallOption auth = this.getCallOptions(connectorSession);
+ FlightDescriptor descriptor = FlightDescriptor.command(request.getCommand());
+ logger.debug("Fetching flight info");
+ FlightInfo flightInfo = client.getFlightClient().getInfo(descriptor, auth);
+ logger.debug("got flight info");
+ return flightInfo;
+ }
+ catch (Exception e) {
+ throw new ArrowException(ARROW_FLIGHT_ERROR, "The flight information could not be obtained from the flight server." + e.getMessage(), e);
+ }
+ }
+
+ public synchronized void close() throws Exception
+ {
+ if (arrowFlightClient != null) {
+ arrowFlightClient.close();
+ arrowFlightClient = null;
+ }
+ if (trustedCertificate.isPresent()) {
+ trustedCertificate.get().close();
+ }
+ shutdownTimer();
+ isClientClosed.set(true);
+ }
+
+ public void resetTimer()
+ {
+ shutdownTimer();
+ scheduleCloseTask();
+ }
+
+ public void shutdownTimer()
+ {
+ if (scheduledExecutorService != null) {
+ scheduledExecutorService.shutdownNow();
+ }
+ }
+
+ protected abstract CredentialCallOption getCallOptions(ConnectorSession connectorSession);
+
+ private void initializeClient(Optional uri)
+ {
+ if (!isClientClosed.get()) {
+ return;
+ }
+ try {
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ Optional trustedCertificate = Optional.empty();
+
+ Location location;
+ if (uri.isPresent()) {
+ location = new Location(uri.get());
+ }
+ else {
+ if (config.getArrowFlightServerSslEnabled() != null && !config.getArrowFlightServerSslEnabled()) {
+ location = Location.forGrpcInsecure(config.getFlightServerName(), config.getArrowFlightPort());
+ }
+ else {
+ location = Location.forGrpcTls(config.getFlightServerName(), config.getArrowFlightPort());
+ }
+ }
+
+ FlightClient.Builder flightClientBuilder = FlightClient.builder(allocator, location);
+ if (config.getVerifyServer() != null && !config.getVerifyServer()) {
+ flightClientBuilder.verifyServer(false);
+ }
+ else if (config.getFlightServerSSLCertificate() != null) {
+ trustedCertificate = Optional.of(new FileInputStream(config.getFlightServerSSLCertificate()));
+ flightClientBuilder.trustedCertificates(trustedCertificate.get()).useTls();
+ }
+
+ FlightClient flightClient = flightClientBuilder.build();
+ this.arrowFlightClient = new ArrowFlightClient(flightClient, trustedCertificate, allocator);
+ isClientClosed.set(false);
+ }
+ catch (Exception ex) {
+ throw new ArrowException(ARROW_FLIGHT_ERROR, "The flight client could not be obtained." + ex.getMessage(), ex);
+ }
+ }
+
+ private void scheduleCloseTask()
+ {
+ scheduledExecutorService = Executors.newScheduledThreadPool(1);
+ Runnable closeTask = () -> {
+ try {
+ close();
+ logger.info("in closeTask");
+ }
+ catch (Exception e) {
+ logger.error(e);
+ }
+ scheduledExecutorService.shutdown();
+ };
+ scheduledExecutorService.schedule(closeTask, TIMER_DURATION_IN_MINUTES, TimeUnit.MINUTES);
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightConfig.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightConfig.java
new file mode 100644
index 0000000000000..a30301dcdc361
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightConfig.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.airlift.configuration.Config;
+
+public class ArrowFlightConfig
+{
+ private String server;
+ private Boolean verifyServer;
+ private String flightServerSSLCertificate;
+ private Boolean arrowFlightServerSslEnabled;
+ private Integer arrowFlightPort;
+ public String getFlightServerName()
+ {
+ return server;
+ }
+
+ public Boolean getVerifyServer()
+ {
+ return verifyServer;
+ }
+
+ public Boolean getArrowFlightServerSslEnabled()
+ {
+ return arrowFlightServerSslEnabled;
+ }
+
+ public String getFlightServerSSLCertificate()
+ {
+ return flightServerSSLCertificate;
+ }
+
+ public Integer getArrowFlightPort()
+ {
+ return arrowFlightPort;
+ }
+
+ @Config("arrow-flight.server")
+ public ArrowFlightConfig setFlightServerName(String server)
+ {
+ this.server = server;
+ return this;
+ }
+
+ @Config("arrow-flight.server.verify")
+ public ArrowFlightConfig setVerifyServer(Boolean verifyServer)
+ {
+ this.verifyServer = verifyServer;
+ return this;
+ }
+
+ @Config("arrow-flight.server.port")
+ public ArrowFlightConfig setArrowFlightPort(Integer arrowFlightPort)
+ {
+ this.arrowFlightPort = arrowFlightPort;
+ return this;
+ }
+
+ @Config("arrow-flight.server-ssl-certificate")
+ public ArrowFlightConfig setFlightServerSSLCertificate(String flightServerSSLCertificate)
+ {
+ this.flightServerSSLCertificate = flightServerSSLCertificate;
+ return this;
+ }
+
+ @Config("arrow-flight.server-ssl-enabled")
+ public ArrowFlightConfig setArrowFlightServerSslEnabled(Boolean arrowFlightServerSslEnabled)
+ {
+ this.arrowFlightServerSslEnabled = arrowFlightServerSslEnabled;
+ return this;
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightRequest.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightRequest.java
new file mode 100644
index 0000000000000..7e04e0a6066e3
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightRequest.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+public interface ArrowFlightRequest
+{
+ byte[] getCommand();
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowHandleResolver.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowHandleResolver.java
new file mode 100644
index 0000000000000..8b231b98a6ee6
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowHandleResolver.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+
+public class ArrowHandleResolver
+ implements ConnectorHandleResolver
+{
+ @Override
+ public Class extends ConnectorTableHandle> getTableHandleClass()
+ {
+ return ArrowTableHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorTableLayoutHandle> getTableLayoutHandleClass()
+ {
+ return ArrowTableLayoutHandle.class;
+ }
+
+ @Override
+ public Class extends ColumnHandle> getColumnHandleClass()
+ {
+ return ArrowColumnHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorSplit> getSplitClass()
+ {
+ return ArrowSplit.class;
+ }
+
+ @Override
+ public Class extends ConnectorTransactionHandle> getTransactionHandleClass()
+ {
+ return ArrowTransactionHandle.class;
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowModule.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowModule.java
new file mode 100644
index 0000000000000..b20762f8ad497
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowModule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.connector.Connector;
+import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
+
+import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
+import static java.util.Objects.requireNonNull;
+
+public class ArrowModule
+ implements Module
+{
+ protected final String connectorId;
+
+ public ArrowModule(String connectorId)
+ {
+ this.connectorId = requireNonNull(connectorId, "connector id is null");
+ }
+
+ public void configure(Binder binder)
+ {
+ configBinder(binder).bindConfig(ArrowFlightConfig.class);
+ binder.bind(ArrowConnector.class).in(Scopes.SINGLETON);
+ binder.bind(ArrowConnectorId.class).toInstance(new ArrowConnectorId(connectorId));
+ binder.bind(ConnectorHandleResolver.class).to(ArrowHandleResolver.class).in(Scopes.SINGLETON);
+ binder.bind(ArrowPageSourceProvider.class).in(Scopes.SINGLETON);
+ binder.bind(ConnectorPageSourceProvider.class).to(ArrowPageSourceProvider.class).in(Scopes.SINGLETON);
+ binder.bind(Connector.class).to(ArrowConnector.class).in(Scopes.SINGLETON);
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSource.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSource.java
new file mode 100644
index 0000000000000..954689c91b3ae
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSource.java
@@ -0,0 +1,566 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.common.Page;
+import com.facebook.presto.common.block.Block;
+import com.facebook.presto.common.block.BlockBuilder;
+import com.facebook.presto.common.type.DateType;
+import com.facebook.presto.common.type.DecimalType;
+import com.facebook.presto.common.type.Decimals;
+import com.facebook.presto.common.type.TimeType;
+import com.facebook.presto.common.type.TimestampType;
+import com.facebook.presto.common.type.Type;
+import com.facebook.presto.common.type.VarcharType;
+import com.facebook.presto.spi.ConnectorPageSource;
+import com.facebook.presto.spi.ConnectorSession;
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
+import org.apache.arrow.flight.FlightRuntimeException;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.Ticket;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DateMilliVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.NullVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeSecVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.TimeStampSecVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static com.facebook.plugin.arrow.ArrowErrorCode.ARROW_FLIGHT_ERROR;
+
+public class ArrowPageSource
+ implements ConnectorPageSource
+{
+ private static Logger logger = Logger.get(ArrowPageSource.class);
+ private final ArrowSplit split;
+ private final List columnHandles;
+ private final ArrowFlightClientHandler clientHandler;
+ private boolean completed;
+ private int currentPosition;
+ private Optional vectorSchemaRoot = Optional.empty();
+ private ArrowFlightClient flightClient;
+ private FlightStream flightStream;
+
+ public ArrowPageSource(ArrowSplit split, List columnHandles, ArrowFlightClientHandler clientHandler,
+ ConnectorSession connectorSession)
+ {
+ this.columnHandles = columnHandles;
+ this.split = split;
+ this.clientHandler = clientHandler;
+ getFlightStream(clientHandler, split.getTicket(), connectorSession);
+ }
+
+ @Override
+ public long getCompletedBytes()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getCompletedPositions()
+ {
+ return currentPosition;
+ }
+
+ @Override
+ public long getReadTimeNanos()
+ {
+ return 0;
+ }
+
+ @Override
+ public boolean isFinished()
+ {
+ return completed;
+ }
+
+ @Override
+ public long getSystemMemoryUsage()
+ {
+ return 0;
+ }
+
+ @Override
+ public Page getNextPage()
+ {
+ if (vectorSchemaRoot.isPresent()) {
+ vectorSchemaRoot.get().close();
+ vectorSchemaRoot = Optional.empty();
+ }
+
+ if (flightStream.next()) {
+ vectorSchemaRoot = Optional.ofNullable(flightStream.getRoot());
+ }
+
+ if (!vectorSchemaRoot.isPresent()) {
+ completed = true;
+ }
+
+ if (isFinished()) {
+ return null;
+ }
+
+ currentPosition++;
+
+ List blocks = new ArrayList<>();
+ for (int columnIndex = 0; columnIndex < columnHandles.size(); columnIndex++) {
+ FieldVector vector = vectorSchemaRoot.get().getVector(columnIndex);
+ Type type = columnHandles.get(columnIndex).getColumnType();
+
+ Block block = buildBlockFromVector(vector, type);
+ blocks.add(block);
+ }
+
+ return new Page(vectorSchemaRoot.get().getRowCount(), blocks.toArray(new Block[0]));
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ if (vectorSchemaRoot.isPresent()) {
+ vectorSchemaRoot.get().close();
+ }
+ if (flightStream != null) {
+ try {
+ flightStream.close();
+ }
+ catch (Exception e) {
+ logger.error(e);
+ }
+ }
+ }
+
+ private void getFlightStream(ArrowFlightClientHandler clientHandler, byte[] ticket, ConnectorSession connectorSession)
+ {
+ try {
+ Optional uri = split.getLocationUrls().isEmpty() ? Optional.empty() : Optional.of(split.getLocationUrls().get(0));
+ flightClient = clientHandler.getClient(uri);
+ flightStream = flightClient.getFlightClient().getStream(new Ticket(ticket), clientHandler.getCallOptions(connectorSession));
+ }
+ catch (FlightRuntimeException e) {
+ throw new ArrowException(ARROW_FLIGHT_ERROR, e.getMessage(), e);
+ }
+ }
+
+ private Block buildBlockFromVector(FieldVector vector, Type type)
+ {
+ if (vector instanceof BitVector) {
+ return buildBlockFromBitVector((BitVector) vector, type);
+ }
+ else if (vector instanceof TinyIntVector) {
+ return buildBlockFromTinyIntVector((TinyIntVector) vector, type);
+ }
+ else if (vector instanceof IntVector) {
+ return buildBlockFromIntVector((IntVector) vector, type);
+ }
+ else if (vector instanceof SmallIntVector) {
+ return buildBlockFromSmallIntVector((SmallIntVector) vector, type);
+ }
+ else if (vector instanceof BigIntVector) {
+ return buildBlockFromBigIntVector((BigIntVector) vector, type);
+ }
+ else if (vector instanceof DecimalVector) {
+ return buildBlockFromDecimalVector((DecimalVector) vector, type);
+ }
+ else if (vector instanceof NullVector) {
+ return buildBlockFromNullVector((NullVector) vector, type);
+ }
+ else if (vector instanceof TimeStampMicroVector) {
+ return buildBlockFromTimeStampMicroVector((TimeStampMicroVector) vector, type);
+ }
+ else if (vector instanceof TimeStampMilliVector) {
+ return buildBlockFromTimeStampMilliVector((TimeStampMilliVector) vector, type);
+ }
+ else if (vector instanceof Float4Vector) {
+ return buildBlockFromFloat4Vector((Float4Vector) vector, type);
+ }
+ else if (vector instanceof Float8Vector) {
+ return buildBlockFromFloat8Vector((Float8Vector) vector, type);
+ }
+ else if (vector instanceof VarCharVector) {
+ return buildBlockFromVarCharVector((VarCharVector) vector, type);
+ }
+ else if (vector instanceof VarBinaryVector) {
+ return buildBlockFromVarBinaryVector((VarBinaryVector) vector, type);
+ }
+ else if (vector instanceof DateDayVector) {
+ return buildBlockFromDateDayVector((DateDayVector) vector, type);
+ }
+ else if (vector instanceof DateMilliVector) {
+ return buildBlockFromDateMilliVector((DateMilliVector) vector, type);
+ }
+ else if (vector instanceof TimeMilliVector) {
+ return buildBlockFromTimeMilliVector((TimeMilliVector) vector, type);
+ }
+ else if (vector instanceof TimeSecVector) {
+ return buildBlockFromTimeSecVector((TimeSecVector) vector, type);
+ }
+ else if (vector instanceof TimeStampSecVector) {
+ return buildBlockFromTimeStampSecVector((TimeStampSecVector) vector, type);
+ }
+ else if (vector instanceof TimeMicroVector) {
+ return buildBlockFromTimeMicroVector((TimeMicroVector) vector, type);
+ }
+ throw new UnsupportedOperationException("Unsupported vector type: " + vector.getClass().getSimpleName());
+ }
+
+ private Block buildBlockFromBitVector(BitVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ type.writeBoolean(builder, vector.get(i) == 1);
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromIntVector(IntVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ type.writeLong(builder, vector.get(i));
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromSmallIntVector(SmallIntVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ type.writeLong(builder, vector.get(i));
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromTinyIntVector(TinyIntVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ type.writeLong(builder, vector.get(i));
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromBigIntVector(BigIntVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ type.writeLong(builder, vector.get(i));
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromDecimalVector(DecimalVector vector, Type type)
+ {
+ if (!(type instanceof DecimalType)) {
+ throw new IllegalArgumentException("Type must be a DecimalType for DecimalVector");
+ }
+
+ DecimalType decimalType = (DecimalType) type;
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ BigDecimal decimal = vector.getObject(i); // Get the BigDecimal value
+ if (decimalType.isShort()) {
+ builder.writeLong(decimal.unscaledValue().longValue());
+ }
+ else {
+ Slice slice = Decimals.encodeScaledValue(decimal);
+ decimalType.writeSlice(builder, slice, 0, slice.length());
+ }
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromNullVector(NullVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ builder.appendNull();
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromTimeStampMicroVector(TimeStampMicroVector vector, Type type)
+ {
+ if (!(type instanceof TimestampType)) {
+ throw new IllegalArgumentException("Expected TimestampType but got " + type.getClass().getName());
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ long micros = vector.get(i);
+ long millis = TimeUnit.MICROSECONDS.toMillis(micros);
+ type.writeLong(builder, millis);
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromTimeStampMilliVector(TimeStampMilliVector vector, Type type)
+ {
+ if (!(type instanceof TimestampType)) {
+ throw new IllegalArgumentException("Expected TimestampType but got " + type.getClass().getName());
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ long millis = vector.get(i);
+ type.writeLong(builder, millis);
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromFloat8Vector(Float8Vector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ type.writeDouble(builder, vector.get(i));
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromFloat4Vector(Float4Vector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ int intBits = Float.floatToIntBits(vector.get(i));
+ type.writeLong(builder, intBits);
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromVarBinaryVector(VarBinaryVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ byte[] value = vector.get(i);
+ type.writeSlice(builder, Slices.wrappedBuffer(value));
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromVarCharVector(VarCharVector vector, Type type)
+ {
+ if (!(type instanceof VarcharType)) {
+ throw new IllegalArgumentException("Expected VarcharType but got " + type.getClass().getName());
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ String value = new String(vector.get(i), StandardCharsets.UTF_8);
+ type.writeSlice(builder, Slices.utf8Slice(value));
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromDateDayVector(DateDayVector vector, Type type)
+ {
+ if (!(type instanceof DateType)) {
+ throw new IllegalArgumentException("Expected DateType but got " + type.getClass().getName());
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ type.writeLong(builder, vector.get(i));
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromDateMilliVector(DateMilliVector vector, Type type)
+ {
+ if (!(type instanceof DateType)) {
+ throw new IllegalArgumentException("Expected DateType but got " + type.getClass().getName());
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ DateType dateType = (DateType) type;
+ long days = TimeUnit.MILLISECONDS.toDays(vector.get(i));
+ dateType.writeLong(builder, days);
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromTimeSecVector(TimeSecVector vector, Type type)
+ {
+ if (!(type instanceof TimeType)) {
+ throw new IllegalArgumentException("Type must be a TimeType for TimeSecVector");
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ int value = vector.get(i);
+ long millis = TimeUnit.SECONDS.toMillis(value);
+ type.writeLong(builder, millis);
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromTimeMilliVector(TimeMilliVector vector, Type type)
+ {
+ if (!(type instanceof TimeType)) {
+ throw new IllegalArgumentException("Type must be a TimeType for TimeSecVector");
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ int value = vector.get(i);
+ long millis = TimeUnit.MILLISECONDS.toMillis(value);
+ type.writeLong(builder, millis);
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromTimeMicroVector(TimeMicroVector vector, Type type)
+ {
+ if (!(type instanceof TimeType)) {
+ throw new IllegalArgumentException("Type must be a TimeType for TimemicroVector");
+ }
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ long value = vector.get(i);
+ long micro = TimeUnit.MICROSECONDS.toMillis(value);
+ type.writeLong(builder, micro);
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromTimeStampSecVector(TimeStampSecVector vector, Type type)
+ {
+ if (!(type instanceof TimestampType)) {
+ throw new IllegalArgumentException("Type must be a TimestampType for TimeStampSecVector");
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ long value = vector.get(i);
+ long millis = TimeUnit.SECONDS.toMillis(value);
+ type.writeLong(builder, millis);
+ }
+ }
+ return builder.build();
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSourceProvider.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSourceProvider.java
new file mode 100644
index 0000000000000..f3bb41c3e35d4
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSourceProvider.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorPageSource;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.SplitContext;
+import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.google.common.collect.ImmutableList;
+
+import javax.inject.Inject;
+
+import java.util.List;
+
+public class ArrowPageSourceProvider
+ implements ConnectorPageSourceProvider
+{
+ private static final Logger logger = Logger.get(ArrowPageSourceProvider.class);
+ private ArrowFlightClientHandler clientHandler;
+ @Inject
+ public ArrowPageSourceProvider(ArrowFlightClientHandler clientHandler)
+ {
+ this.clientHandler = clientHandler;
+ }
+
+ @Override
+ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List columns, SplitContext splitContext)
+ {
+ ImmutableList.Builder columnHandles = ImmutableList.builder();
+ for (ColumnHandle handle : columns) {
+ columnHandles.add((ArrowColumnHandle) handle);
+ }
+ ArrowSplit arrowSplit = (ArrowSplit) split;
+ logger.debug("Processing split with flight ticket");
+ return new ArrowPageSource(arrowSplit, columnHandles.build(), clientHandler, session);
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPlugin.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPlugin.java
new file mode 100644
index 0000000000000..7744af3ecd357
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPlugin.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.spi.Plugin;
+import com.facebook.presto.spi.connector.ConnectorFactory;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Module;
+
+import static com.google.common.base.MoreObjects.firstNonNull;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.Objects.requireNonNull;
+
+public class ArrowPlugin
+ implements Plugin
+{
+ protected final String name;
+ protected final Module module;
+
+ public ArrowPlugin(String name, Module module)
+ {
+ checkArgument(!isNullOrEmpty(name), "name is null or empty");
+ this.name = name;
+ this.module = requireNonNull(module, "module is null");
+ }
+
+ @Override
+ public Iterable getConnectorFactories()
+ {
+ return ImmutableList.of(new ArrowConnectorFactory(name, module, getClassLoader()));
+ }
+
+ private static ClassLoader getClassLoader()
+ {
+ return firstNonNull(Thread.currentThread().getContextClassLoader(), ArrowPlugin.class.getClassLoader());
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowSplit.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowSplit.java
new file mode 100644
index 0000000000000..db65912de8c58
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowSplit.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.HostAddress;
+import com.facebook.presto.spi.NodeProvider;
+import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ArrowSplit
+ implements ConnectorSplit
+{
+ private final String schemaName;
+ private final String tableName;
+ private final byte[] ticket;
+ private final List locationUrls;
+
+ @JsonCreator
+ public ArrowSplit(
+ @JsonProperty("schemaName") @Nullable String schemaName,
+ @JsonProperty("tableName") String tableName,
+ @JsonProperty("ticket") byte[] ticket,
+ @JsonProperty("locationUrls") List locationUrls)
+ {
+ this.schemaName = schemaName;
+ this.tableName = tableName;
+ this.ticket = ticket;
+ this.locationUrls = locationUrls;
+ }
+
+ @Override
+ public NodeSelectionStrategy getNodeSelectionStrategy()
+ {
+ return NodeSelectionStrategy.NO_PREFERENCE;
+ }
+
+ @Override
+ public List getPreferredNodes(NodeProvider nodeProvider)
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Object getInfo()
+ {
+ return this.getInfoMap();
+ }
+
+ @JsonProperty
+ public String getSchemaName()
+ {
+ return schemaName;
+ }
+
+ @JsonProperty
+ public String getTableName()
+ {
+ return tableName;
+ }
+
+ @JsonProperty
+ public byte[] getTicket()
+ {
+ return ticket;
+ }
+
+ @JsonProperty
+ public List getLocationUrls()
+ {
+ return locationUrls;
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowTableHandle.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowTableHandle.java
new file mode 100644
index 0000000000000..e0f8c6586791d
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowTableHandle.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class ArrowTableHandle
+ implements ConnectorTableHandle
+{
+ private final String schema;
+ private final String table;
+
+ @JsonCreator
+ public ArrowTableHandle(
+ @JsonProperty("schema") String schema,
+ @JsonProperty("table") String table)
+ {
+ this.schema = schema;
+ this.table = table;
+ }
+
+ @JsonProperty("schema")
+ public String getSchema()
+ {
+ return schema;
+ }
+
+ @JsonProperty("table")
+ public String getTable()
+ {
+ return table;
+ }
+
+ @Override
+ public String toString()
+ {
+ return schema + ":" + table;
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowTableLayoutHandle.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowTableLayoutHandle.java
new file mode 100644
index 0000000000000..c62aea21ca449
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowTableLayoutHandle.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.common.predicate.TupleDomain;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+public class ArrowTableLayoutHandle
+ implements ConnectorTableLayoutHandle
+{
+ private final ArrowTableHandle tableHandle;
+ private final List columnHandles;
+ private final TupleDomain tupleDomain;
+ private final Optional additionalPredicate;
+
+ @JsonCreator
+ public ArrowTableLayoutHandle(@JsonProperty("table") ArrowTableHandle table,
+ @JsonProperty("columnHandles") List columnHandles,
+ @JsonProperty("tupleDomain") TupleDomain domain,
+ @JsonProperty("additionalPredicate") Optional additionalPredicate)
+ {
+ this.tableHandle = requireNonNull(table, "table is null");
+ this.columnHandles = requireNonNull(columnHandles, "columns are null");
+ this.tupleDomain = requireNonNull(domain, "domain is null");
+ this.additionalPredicate = additionalPredicate;
+ }
+
+ @JsonProperty("table")
+ public ArrowTableHandle getTableHandle()
+ {
+ return tableHandle;
+ }
+
+ @JsonProperty("tupleDomain")
+ public TupleDomain getTupleDomain()
+ {
+ return tupleDomain;
+ }
+
+ @JsonProperty("additionalPredicate")
+ public Optional getAdditionalPredicate()
+ {
+ return additionalPredicate;
+ }
+
+ @JsonProperty("columnHandles")
+ public List getColumnHandles()
+ {
+ return columnHandles;
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowTransactionHandle.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowTransactionHandle.java
new file mode 100644
index 0000000000000..07eb7385cfbcf
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowTransactionHandle.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+
+public enum ArrowTransactionHandle
+ implements ConnectorTransactionHandle
+{
+ INSTANCE
+}
diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/ArrowServer.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/ArrowServer.java
new file mode 100644
index 0000000000000..334b744f0e881
--- /dev/null
+++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/ArrowServer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import org.apache.arrow.flight.Action;
+import org.apache.arrow.flight.ActionType;
+import org.apache.arrow.flight.Criteria;
+import org.apache.arrow.flight.FlightDescriptor;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.FlightProducer;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.PutResult;
+import org.apache.arrow.flight.Result;
+import org.apache.arrow.flight.Ticket;
+
+public class ArrowServer
+ implements FlightProducer
+{
+ @Override
+ public void getStream(CallContext callContext, Ticket ticket, ServerStreamListener serverStreamListener)
+ {
+ }
+
+ @Override
+ public void listFlights(CallContext callContext, Criteria criteria, StreamListener streamListener)
+ {
+ }
+
+ @Override
+ public FlightInfo getFlightInfo(CallContext callContext, FlightDescriptor flightDescriptor)
+ {
+ return null;
+ }
+
+ @Override
+ public Runnable acceptPut(CallContext callContext, FlightStream flightStream, StreamListener streamListener)
+ {
+ return null;
+ }
+
+ @Override
+ public void doAction(CallContext callContext, Action action, StreamListener streamListener)
+ {
+ }
+
+ @Override
+ public void listActions(CallContext callContext, StreamListener streamListener)
+ {
+ }
+}
diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/ArrowTableHandleTest.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/ArrowTableHandleTest.java
new file mode 100644
index 0000000000000..cd8d3984cc533
--- /dev/null
+++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/ArrowTableHandleTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+public class ArrowTableHandleTest
+{
+ @Test
+ public void testConstructorAndGetters()
+ {
+ ArrowTableHandle handle = new ArrowTableHandle("test_schema", "test_table");
+ assertEquals(handle.getSchema(), "test_schema");
+ assertEquals(handle.getTable(), "test_table");
+ }
+
+ @Test
+ public void testToString()
+ {
+ ArrowTableHandle handle = new ArrowTableHandle("test_schema", "test_table");
+ assertEquals(handle.toString(), "test_schema:test_table");
+ }
+
+ @Test
+ public void testJsonSerialization() throws Exception
+ {
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ ArrowTableHandle handle = new ArrowTableHandle("test_schema", "test_table");
+ String json = objectMapper.writeValueAsString(handle);
+
+ assertNotNull(json);
+
+ ArrowTableHandle deserialized = objectMapper.readValue(json, ArrowTableHandle.class);
+ assertEquals(deserialized.getSchema(), handle.getSchema());
+ assertEquals(deserialized.getTable(), handle.getTable());
+ }
+}
diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowAbstractFlightRequest.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowAbstractFlightRequest.java
new file mode 100644
index 0000000000000..20921f8105099
--- /dev/null
+++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowAbstractFlightRequest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+public class TestArrowAbstractFlightRequest
+{
+ @Test
+ public void testGetSchema()
+ {
+ String schema = "test_schema";
+ ArrowAbstractFlightRequest request = new TestArrowFlightRequest(schema);
+ assertEquals(schema, request.getSchema());
+ }
+
+ @Test
+ public void testGetTable()
+ {
+ String schema = "test_schema";
+ String table = "test_table";
+ ArrowAbstractFlightRequest request = new TestArrowFlightRequest(schema, table);
+ assertEquals(table, request.getTable());
+ }
+
+ @Test
+ public void testGetQuery()
+ {
+ String schema = "test_schema";
+ String table = "test_table";
+ String query = "SELECT * FROM test_table";
+ ArrowAbstractFlightRequest request = new TestArrowFlightRequest(schema, table, query);
+ assertTrue(request.getQuery().isPresent());
+ assertEquals(query, request.getQuery().get());
+ }
+
+ @Test
+ public void testEmptyQuery()
+ {
+ String schema = "test_schema";
+ String table = "test_table";
+ ArrowAbstractFlightRequest request = new TestArrowFlightRequest(schema, table);
+ assertFalse(request.getQuery().isPresent());
+ }
+
+ @Test
+ public void testGetCommand()
+ {
+ TestArrowFlightRequest request = new TestArrowFlightRequest("schema");
+
+ byte[] command = request.getCommand();
+
+ assertNotNull(command);
+ assertEquals(0, command.length);
+ }
+}
diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowAbstractMetadata.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowAbstractMetadata.java
new file mode 100644
index 0000000000000..378dc325da791
--- /dev/null
+++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowAbstractMetadata.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.common.type.BooleanType;
+import com.facebook.presto.common.type.DecimalType;
+import com.facebook.presto.common.type.IntegerType;
+import com.facebook.presto.common.type.VarcharType;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutResult;
+import com.facebook.presto.spi.ConnectorTableMetadata;
+import com.facebook.presto.spi.Constraint;
+import com.facebook.presto.spi.SchemaTableName;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+@Test(singleThreaded = true)
+public class TestArrowAbstractMetadata
+{
+ public TestArrowAbstractMetadata() throws IOException
+ {
+ }
+
+ @Test
+ public void testGetTableMetadata()
+ {
+ // Mock dependencies
+ ArrowAbstractMetadata metadata = mock(ArrowAbstractMetadata.class);
+ Mockito.doCallRealMethod().when(metadata).getTableMetadata(Mockito.any(ConnectorSession.class), Mockito.any(ConnectorTableHandle.class));
+ ConnectorSession session = mock(ConnectorSession.class);
+ ArrowTableHandle tableHandle = new ArrowTableHandle("testSchema", "testTable");
+
+ // Mock the behavior of getColumnsList
+ List columnList = Arrays.asList(
+ new Field("column1", FieldType.notNullable(new ArrowType.Int(32, true)), null),
+ new Field("column2", FieldType.notNullable(new ArrowType.Decimal(10, 2)), null),
+ new Field("column3", FieldType.notNullable(new ArrowType.Bool()), null));
+
+ when(metadata.getColumnsList("testSchema", "testTable", session)).thenReturn(columnList);
+ // Call the method under test
+ ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(session, tableHandle);
+
+ // Verify the result
+ assertNotNull(tableMetadata);
+ assertEquals(tableMetadata.getTable(), new SchemaTableName("testSchema", "testTable"));
+ List columns = tableMetadata.getColumns();
+ assertEquals(columns.size(), 3);
+ assertEquals(columns.get(0), new ColumnMetadata("column1", IntegerType.INTEGER));
+ assertEquals(columns.get(1), new ColumnMetadata("column2", DecimalType.createDecimalType(10, 2)));
+ assertEquals(columns.get(2), new ColumnMetadata("column3", BooleanType.BOOLEAN));
+ }
+
+ @Test
+ public void testGetTableLayouts()
+ {
+ // Mock dependencies
+ ConnectorSession session = mock(ConnectorSession.class);
+ ConnectorTableHandle tableHandle = new ArrowTableHandle("testSchema", "testTable");
+
+ // Mock the constraint
+ Constraint trueConstraint = Constraint.alwaysTrue();
+
+ Set desiredColumns = new HashSet<>();
+ desiredColumns.add(new ArrowColumnHandle("column1", IntegerType.INTEGER));
+ desiredColumns.add(new ArrowColumnHandle("column2", VarcharType.VARCHAR));
+
+ // Call the method under test
+ ArrowAbstractMetadata metadata = mock(ArrowAbstractMetadata.class);
+ Mockito.doCallRealMethod().when(metadata).getTableLayouts(Mockito.any(ConnectorSession.class), Mockito.any(ConnectorTableHandle.class), Mockito.any(Constraint.class), Mockito.any(Optional.class));
+
+ List tableLayouts = metadata.getTableLayouts(session, tableHandle, trueConstraint, Optional.of(desiredColumns));
+
+ // Verify the result
+ assertNotNull(tableLayouts);
+ assertEquals(tableLayouts.size(), 1);
+ ConnectorTableLayoutResult layoutResult = tableLayouts.get(0);
+ assertNotNull(layoutResult);
+ assertNotNull(layoutResult.getTableLayout());
+ assertEquals(((ArrowTableLayoutHandle) layoutResult.getTableLayout().getHandle()).getTableHandle(), tableHandle);
+ assertEquals(((ArrowTableLayoutHandle) layoutResult.getTableLayout().getHandle()).getColumnHandles(), desiredColumns);
+ assertEquals(layoutResult.getTableLayout().getPredicate(), trueConstraint.getSummary());
+ }
+}
diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowColumnHandle.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowColumnHandle.java
new file mode 100644
index 0000000000000..6eca6de0a85c5
--- /dev/null
+++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowColumnHandle.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.common.type.Type;
+import com.facebook.presto.common.type.VarcharType;
+import com.facebook.presto.spi.ColumnMetadata;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class TestArrowColumnHandle
+{
+ @Test
+ public void testConstructorAndGetters()
+ {
+ String columnName = "TestColumn";
+ Type columnType = VarcharType.createUnboundedVarcharType();
+
+ ArrowColumnHandle columnHandle = new ArrowColumnHandle(columnName, columnType);
+
+ assertEquals(columnHandle.getColumnName(), columnName);
+ assertEquals(columnHandle.getColumnType(), columnType);
+ }
+
+ @Test
+ public void testGetColumnMetadata()
+ {
+ String columnName = "testcolumn";
+ Type columnType = VarcharType.createUnboundedVarcharType();
+
+ ArrowColumnHandle columnHandle = new ArrowColumnHandle(columnName, columnType);
+ ColumnMetadata columnMetadata = columnHandle.getColumnMetadata();
+
+ assertEquals(columnMetadata.getName(), columnName);
+ assertEquals(columnMetadata.getType(), columnType);
+ }
+}
diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowFlightClient.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowFlightClient.java
new file mode 100644
index 0000000000000..ce5039c3e7186
--- /dev/null
+++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowFlightClient.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.memory.RootAllocator;
+import org.testng.annotations.Test;
+
+import java.io.InputStream;
+import java.util.Optional;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+public class TestArrowFlightClient
+{
+ @Test
+ public void testArrowFlightClient()
+ {
+ FlightClient flightClient = mock(FlightClient.class);
+ InputStream certificateStream = mock(InputStream.class);
+ Optional trustedCertificate = Optional.of(certificateStream);
+ RootAllocator allocator = mock(RootAllocator.class);
+
+ ArrowFlightClient arrowFlightClient = new ArrowFlightClient(flightClient, trustedCertificate, allocator);
+
+ assertEquals(arrowFlightClient.getFlightClient(), flightClient);
+ assertTrue(arrowFlightClient.getTrustedCertificate().isPresent());
+ assertEquals(arrowFlightClient.getTrustedCertificate().get(), certificateStream);
+ }
+
+ @Test
+ public void testArrowFlightClientWithoutCertificate()
+ {
+ FlightClient flightClient = mock(FlightClient.class);
+ Optional trustedCertificate = Optional.empty();
+ RootAllocator allocator = mock(RootAllocator.class);
+ ArrowFlightClient arrowFlightClient = new ArrowFlightClient(flightClient, trustedCertificate, allocator);
+
+ assertEquals(arrowFlightClient.getFlightClient(), flightClient);
+ assertFalse(arrowFlightClient.getTrustedCertificate().isPresent());
+ }
+
+ @Test
+ public void testClose() throws Exception
+ {
+ FlightClient flightClient = mock(FlightClient.class);
+ InputStream certificateStream = mock(InputStream.class);
+ Optional trustedCertificate = Optional.of(certificateStream);
+ RootAllocator allocator = mock(RootAllocator.class);
+
+ ArrowFlightClient arrowFlightClient = new ArrowFlightClient(flightClient, trustedCertificate, allocator);
+
+ arrowFlightClient.close();
+
+ verify(flightClient, times(1)).close();
+ verify(certificateStream, times(1)).close();
+ }
+
+ @Test
+ public void testCloseWithoutCertificate() throws Exception
+ {
+ FlightClient flightClient = mock(FlightClient.class);
+ Optional trustedCertificate = Optional.empty();
+ RootAllocator allocator = mock(RootAllocator.class);
+
+ ArrowFlightClient arrowFlightClient = new ArrowFlightClient(flightClient, trustedCertificate, allocator);
+
+ arrowFlightClient.close();
+
+ verify(flightClient, times(1)).close();
+ }
+}
diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowFlightConfig.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowFlightConfig.java
new file mode 100644
index 0000000000000..3e9e643ebdd29
--- /dev/null
+++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowFlightConfig.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.airlift.configuration.testing.ConfigAssertions;
+import com.google.common.collect.ImmutableMap;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+
+public class TestArrowFlightConfig
+{
+ @Test
+ public void testDefaults()
+ {
+ ConfigAssertions.assertRecordedDefaults(ConfigAssertions.recordDefaults(ArrowFlightConfig.class)
+ .setFlightServerName(null)
+ .setVerifyServer(null)
+ .setFlightServerSSLCertificate(null)
+ .setArrowFlightServerSslEnabled(null)
+ .setArrowFlightPort(null));
+ }
+
+ @Test
+ public void testExplicitPropertyMappings()
+ {
+ Map properties = new ImmutableMap.Builder()
+ .put("arrow-flight.server", "127.0.0.1")
+ .put("arrow-flight.server.verify", "true")
+ .put("arrow-flight.server-ssl-certificate", "cert")
+ .put("arrow-flight.server-ssl-enabled", "true")
+ .put("arrow-flight.server.port", "443")
+ .build();
+
+ ArrowFlightConfig expected = new ArrowFlightConfig()
+ .setFlightServerName("127.0.0.1")
+ .setVerifyServer(true)
+ .setFlightServerSSLCertificate("cert")
+ .setArrowFlightServerSslEnabled(true)
+ .setArrowFlightPort(443);
+
+ ConfigAssertions.assertFullMapping(properties, expected);
+ }
+}
diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowFlightRequest.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowFlightRequest.java
new file mode 100644
index 0000000000000..0d6d444ff1ec8
--- /dev/null
+++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowFlightRequest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import java.util.Optional;
+public class TestArrowFlightRequest
+ extends ArrowAbstractFlightRequest
+{
+ public TestArrowFlightRequest(String schema)
+ {
+ super(schema);
+ }
+
+ public TestArrowFlightRequest(String schema, String table)
+ {
+ super(schema, table);
+ }
+
+ public TestArrowFlightRequest(String schema, String table, String query)
+ {
+ super(schema, table, Optional.of(query));
+ }
+
+ @Override
+ public byte[] getCommand()
+ {
+ return new byte[0];
+ }
+}
diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowPageSource.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowPageSource.java
new file mode 100644
index 0000000000000..2e245ecb7b46f
--- /dev/null
+++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowPageSource.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.spi.ConnectorSession;
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.Ticket;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+public class TestArrowPageSource
+{
+ @Mock
+ private ArrowSplit mockSplit;
+
+ @Mock
+ private List mockColumnHandles;
+
+ @Mock
+ private ArrowFlightClientHandler mockClientHandler;
+
+ @Mock
+ private ConnectorSession mockSession;
+
+ @Mock
+ private FlightClient mockFlightClient;
+
+ @Mock
+ private FlightStream mockFlightStream;
+
+ @Mock
+ private VectorSchemaRoot mockVectorSchemaRoot;
+
+ @BeforeClass
+ public void setUp()
+ {
+ MockitoAnnotations.openMocks(this);
+ ArrowFlightClient mockArrowFlightClient = mock(ArrowFlightClient.class);
+ when(mockClientHandler.getClient(any(Optional.class))).thenReturn(mockArrowFlightClient);
+ when(mockArrowFlightClient.getFlightClient()).thenReturn(mockFlightClient);
+ when(mockFlightClient.getStream(any(Ticket.class), any())).thenReturn(mockFlightStream);
+ }
+
+ @Test
+ public void testInitialization()
+ {
+ ArrowPageSource arrowPageSource = new ArrowPageSource(mockSplit, mockColumnHandles, mockClientHandler, mockSession);
+ assertNotNull(arrowPageSource);
+ }
+
+ @Test
+ public void testGetNextPageWithEmptyFlightStream()
+ {
+ when(mockFlightStream.next()).thenReturn(false);
+ ArrowPageSource arrowPageSource = new ArrowPageSource(mockSplit, mockColumnHandles, mockClientHandler, mockSession);
+ assertNull(arrowPageSource.getNextPage());
+ }
+
+ @Test
+ public void testGetNextPageWithNonEmptyFlightStream()
+ {
+ when(mockFlightStream.next()).thenReturn(true);
+ when(mockFlightStream.getRoot()).thenReturn(mockVectorSchemaRoot);
+ when(mockVectorSchemaRoot.getRowCount()).thenReturn(1);
+ ArrowPageSource arrowPageSource = new ArrowPageSource(mockSplit, mockColumnHandles, mockClientHandler, mockSession);
+ assertNotNull(arrowPageSource.getNextPage());
+ }
+
+ @Test
+ public void testCloseResources() throws Exception
+ {
+ ArrowPageSource arrowPageSource = new ArrowPageSource(mockSplit, mockColumnHandles, mockClientHandler, mockSession);
+ arrowPageSource.close();
+ verify(mockFlightStream).close();
+ }
+}
diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowPageSourceProvider.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowPageSourceProvider.java
new file mode 100644
index 0000000000000..77b89cf5d17cb
--- /dev/null
+++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowPageSourceProvider.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorPageSource;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.SplitContext;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.google.common.collect.ImmutableList;
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.Ticket;
+import org.testng.annotations.Test;
+
+import java.util.List;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+public class TestArrowPageSourceProvider
+{
+ @Test
+ public void testCreatePageSourceWithValidParameters()
+ {
+ ArrowFlightClientHandler clientHandler = mock(ArrowFlightClientHandler.class);
+ ArrowFlightClient flightClient = mock(ArrowFlightClient.class);
+ FlightClient flightClientInstance = mock(FlightClient.class);
+ FlightStream flightStream = mock(FlightStream.class);
+ when(clientHandler.getClient(any())).thenReturn(flightClient);
+ when(flightClient.getFlightClient()).thenReturn(flightClientInstance);
+ when(flightClientInstance.getStream(any(Ticket.class), any(), any())).thenReturn(flightStream);
+ ArrowPageSourceProvider arrowPageSourceProvider = new ArrowPageSourceProvider(clientHandler);
+ ConnectorTransactionHandle transactionHandle = mock(ConnectorTransactionHandle.class);
+ ConnectorSession session = mock(ConnectorSession.class);
+ ArrowSplit split = mock(ArrowSplit.class);
+ List columns = ImmutableList.of(mock(ArrowColumnHandle.class));
+ SplitContext splitContext = mock(SplitContext.class);
+ when(split.getTicket()).thenReturn(new byte[0]);
+ ConnectorPageSource pageSource = arrowPageSourceProvider.createPageSource(transactionHandle, session, split, columns, splitContext);
+ assertNotNull(pageSource);
+ assertTrue(pageSource instanceof ArrowPageSource);
+ }
+}
diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowPlugin.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowPlugin.java
new file mode 100644
index 0000000000000..2cd4ee099ddb1
--- /dev/null
+++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowPlugin.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.spi.connector.ConnectorFactory;
+import org.testng.annotations.Test;
+
+import static com.facebook.airlift.testing.Assertions.assertInstanceOf;
+import static com.google.common.collect.Iterables.getOnlyElement;
+
+public class TestArrowPlugin
+{
+ @Test
+ public void testStartup()
+ {
+ ArrowModule testModule = new ArrowModule("arrow-flight");
+ ArrowPlugin plugin = new ArrowPlugin("arrow-flight", testModule);
+ ConnectorFactory factory = getOnlyElement(plugin.getConnectorFactories());
+ assertInstanceOf(factory, ArrowConnectorFactory.class);
+ }
+}
diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowSplit.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowSplit.java
new file mode 100644
index 0000000000000..83b0cbf1cc7b5
--- /dev/null
+++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestArrowSplit.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.airlift.json.JsonCodec;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static com.facebook.airlift.json.JsonCodec.jsonCodec;
+import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.NO_PREFERENCE;
+import static org.testng.Assert.assertEquals;
+
+public class TestArrowSplit
+{
+ private final ArrowSplit split = new ArrowSplit("schemaName", "tableName",
+ "ticket".getBytes(), Arrays.asList("http://host"));
+
+ @Test
+ public void testNodes()
+ {
+ assertEquals(split.getNodeSelectionStrategy(), NO_PREFERENCE);
+ assertEquals(split.getPreferredNodes(null), Collections.emptyList());
+ }
+
+ @Test
+ public void testJsonRoundTrip()
+ {
+ JsonCodec codec = jsonCodec(ArrowSplit.class);
+ String json = codec.toJson(split);
+ ArrowSplit copy = codec.fromJson(json);
+ assertEquals(copy.getSchemaName(), split.getSchemaName());
+ assertEquals(copy.getTableName(), split.getTableName());
+ assertEquals(copy.getTicket(), split.getTicket());
+ assertEquals(copy.getLocationUrls(), split.getLocationUrls());
+ }
+}
diff --git a/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestFlightClient.java b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestFlightClient.java
new file mode 100644
index 0000000000000..5b2e3eea9cfaf
--- /dev/null
+++ b/presto-base-arrow-flight/src/test/java/com/facebook/plugin/arrow/TestFlightClient.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.spi.ConnectorSession;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.FlightServer;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.flight.grpc.CredentialCallOption;
+import org.apache.arrow.memory.RootAllocator;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.Optional;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+public class TestFlightClient
+{
+ private static RootAllocator allocator;
+ private static FlightServer server;
+
+ private static Location serverLocation;
+
+ @BeforeClass
+ public void setup() throws Exception
+ {
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ serverLocation = Location.forGrpcInsecure("127.0.0.1", 9443);
+ server = FlightServer.builder(allocator, serverLocation, new ArrowServer())
+ .build();
+ server.start();
+ System.out.println("Server listening on port " + server.getPort());
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception
+ {
+ server.close();
+ allocator.close();
+ }
+
+ @Test
+ public void testInitializeClient()
+ {
+ ArrowFlightConfig config = mock(ArrowFlightConfig.class);
+ when(config.getFlightServerSSLCertificate()).thenReturn(null);
+ when(config.getVerifyServer()).thenReturn(true);
+ ConnectorSession connectorSession = mock(ConnectorSession.class);
+ ArrowFlightClientHandler handler = new ArrowFlightClientHandler(config) {
+ @Override
+ protected CredentialCallOption getCallOptions(ConnectorSession connectorSession)
+ {
+ return null;
+ }
+ };
+ Optional uri = Optional.of("grpc://127.0.0.1:9443");
+ handler.getClient(uri);
+ assertNotNull(handler.getClient(uri));
+ }
+
+ @Test
+ public void testGetFlightInfo()
+ {
+ ArrowFlightConfig config = Mockito.mock(ArrowFlightConfig.class);
+ FlightInfo mockFlightInfo = mock(FlightInfo.class);
+ ArrowFlightClientHandler clientHandler = Mockito.mock(ArrowFlightClientHandler.class);
+ ArrowFlightRequest request = new TestArrowFlightRequest("schema", "table", "query");
+ ConnectorSession session = Mockito.mock(ConnectorSession.class);
+ when(clientHandler.getFlightInfo(request, session)).thenReturn(mockFlightInfo);
+ FlightInfo result = clientHandler.getFlightInfo(request, session);
+ assertEquals(mockFlightInfo, result);
+ }
+}
diff --git a/presto-docs/src/main/sphinx/connector.rst b/presto-docs/src/main/sphinx/connector.rst
index f07506b460cab..d337fe4ed12d1 100644
--- a/presto-docs/src/main/sphinx/connector.rst
+++ b/presto-docs/src/main/sphinx/connector.rst
@@ -9,6 +9,7 @@ from different data sources.
:maxdepth: 1
connector/accumulo
+ connector/base-arrow-flight
connector/bigquery
connector/blackhole
connector/cassandra
diff --git a/presto-docs/src/main/sphinx/connector/base-arrow-flight.rst b/presto-docs/src/main/sphinx/connector/base-arrow-flight.rst
new file mode 100644
index 0000000000000..5a310cecdd4c3
--- /dev/null
+++ b/presto-docs/src/main/sphinx/connector/base-arrow-flight.rst
@@ -0,0 +1,96 @@
+
+======================
+Arrow-flight Connector
+======================
+This connector allows querying multiple data sources that are supported by an Arrow Flight server.
+Apache Arrow enhances performance and efficiency in data-intensive applications through its columnar memory layout, zero-copy reads, vectorized execution, cross-language interoperability, rich data type support, and optimization for modern hardware. These features collectively reduce overhead, improve data processing speeds, and facilitate seamless data exchange between different systems and languages.
+
+Getting Started with base-arrow-module: Essential Abstract Methods for Developers
+---------------------------------------------------------------------------------
+To use the base-arrow-module, you need to implement certain abstract methods that are specific to your use case. Below are the required classes and their purposes:
+
+* ``ArrowFlightClientHandler.java``
+ This class is responsible for initializing the Flight client and retrieving Flight information from the Flight server. To authenticate the Flight server, you must implement the abstract method ``getCallOptions`` in ArrowFlightClientHandler, which returns the ``CredentialCallOption`` specific to your Flight server.
+
+* ``ArrowAbstractFlightRequest.java``
+ Implement this class to define the request data, including the data source type, connection properties, the number of partitions and other data required to interact with database.
+
+* ``ArrowAbstractMetadata.java``
+ To retrieve metadata (schema and table information), implement the abstract methods in the ArrowAbstractMetadata class.
+
+* ``ArrowAbstractSplitManager.java``
+ Extend the ArrowAbstractSplitManager class to implement the Arrow flight request, defining the Arrow split.
+
+* ``ArrowPlugin.java``
+ Register your connector name by extending the ArrowPlugin class.
+
+* ``ArrowFlightRequest``
+ The ``getCommand`` method in the ``ArrowFlightRequest`` interface should return a byte array for the flight request.
+
+
+Configuration
+-------------
+To configure the Arrow connector, create a catalog file
+in ``etc/catalog`` named, for example, ``arrowmariadb.properties``, to
+mount the Arrow-flight connector as the ``arrowmariadb`` catalog.
+Create the file with the following contents, replacing the
+connection properties as appropriate for your setup:
+
+
+.. code-block:: none
+
+
+ connector.name=
+ arrow-flight.server=
+ arrow-flight.server.port=
+
+
+
+Add other properties that are required for your Flight server to connect.
+
+========================================== ==============================================================
+Property Name Description
+========================================== ==============================================================
+``arrow-flight.server`` Endpoint of arrow-flight server
+``arrow-flight.server.port`` Flight server port
+``arrow-flight.server-ssl-certificate`` Pass ssl certificate
+``arrow-flight.server.verify`` To verify server
+``arrow-flight.server-ssl-enabled`` Port is ssl enabled
+========================================== ==============================================================
+
+Querying Arrow-Flight
+---------------------
+
+The Arrow-Flight connector provides schema for each supported *databases*.
+Example for MariaDB is shown below.
+To see the available schemas, run ``SHOW SCHEMAS``::
+
+ SHOW SCHEMAS FROM arrowmariadb;
+
+To view the tables in the MariaDB database named ``user``,
+run ``SHOW TABLES``::
+
+ SHOW TABLES FROM arrowmariadb.user;
+
+To see a list of the columns in the ``admin`` table in the ``user`` database,
+use either of the following commands::
+
+ DESCRIBE arrowmariadb.user.admin;
+ SHOW COLUMNS FROM arrowmariadb.user.admin;
+
+Finally, you can access the ``admin`` table in the ``user`` database::
+
+ SELECT * FROM arrowmariadb.user.admin;
+
+If you used a different name for your catalog properties file, use
+that catalog name instead of ``arrowmariadb`` in the above examples.
+
+
+Arrow-Flight Connector Limitations
+----------------------------------
+
+* SELECT and DESCRIBE queries are supported by this connector template. Implementing modules can add support for additional features.
+
+* Arrow-flight connector can query against only those datasources which are supported by Flight server.
+
+* The user should have the flight server running for the arrow-flight connector to work.