diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index 841633f94cdba..bab00e6d70d4a 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -615,7 +615,7 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True, Scenario( "app_metadata_flight_info_endpoint", description="Ensure support FlightInfo and Endpoint app_metadata", - skip_testers={"JS", "C#", "Rust", "Java"} + skip_testers={"JS", "C#", "Rust"} ), Scenario( "flight_sql", diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java index ad78cfbd210d4..1967fe1d91c34 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightEndpoint.java @@ -23,6 +23,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -30,6 +31,7 @@ import org.apache.arrow.flight.impl.Flight; +import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; /** @@ -39,6 +41,7 @@ public class FlightEndpoint { private final List locations; private final Ticket ticket; private final Instant expirationTime; + private final byte[] appMetadata; /** * Constructs a new endpoint with no expiration time. @@ -54,13 +57,22 @@ public FlightEndpoint(Ticket ticket, Location... locations) { * Constructs a new endpoint with an expiration time. * * @param ticket A ticket that describe the key of a data stream. + * @param expirationTime (optional) When this endpoint expires. * @param locations The possible locations the stream can be retrieved from. */ public FlightEndpoint(Ticket ticket, Instant expirationTime, Location... locations) { + this(ticket, expirationTime, null, Collections.unmodifiableList(new ArrayList<>(Arrays.asList(locations)))); + } + + /** + * Private constructor with all parameters. Should only be called by Builder. + */ + private FlightEndpoint(Ticket ticket, Instant expirationTime, byte[] appMetadata, List locations) { Objects.requireNonNull(ticket); - this.locations = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(locations))); + this.locations = locations; this.expirationTime = expirationTime; this.ticket = ticket; + this.appMetadata = appMetadata; } /** @@ -77,6 +89,7 @@ public FlightEndpoint(Ticket ticket, Instant expirationTime, Location... locatio } else { this.expirationTime = null; } + this.appMetadata = (flt.getAppMetadata().size() == 0 ? null : flt.getAppMetadata().toByteArray()); this.ticket = new Ticket(flt.getTicket()); } @@ -92,6 +105,10 @@ public Optional getExpirationTime() { return Optional.ofNullable(expirationTime); } + public byte[] getAppMetadata() { + return appMetadata; + } + /** * Converts to the protocol buffer representation. */ @@ -111,6 +128,10 @@ Flight.FlightEndpoint toProtocol() { .build()); } + if (appMetadata != null) { + b.setAppMetadata(ByteString.copyFrom(appMetadata)); + } + return b.build(); } @@ -148,12 +169,13 @@ public boolean equals(Object o) { FlightEndpoint that = (FlightEndpoint) o; return locations.equals(that.locations) && ticket.equals(that.ticket) && - Objects.equals(expirationTime, that.expirationTime); + Objects.equals(expirationTime, that.expirationTime) && + Arrays.equals(appMetadata, that.appMetadata); } @Override public int hashCode() { - return Objects.hash(locations, ticket, expirationTime); + return Objects.hash(locations, ticket, expirationTime, Arrays.hashCode(appMetadata)); } @Override @@ -162,6 +184,59 @@ public String toString() { "locations=" + locations + ", ticket=" + ticket + ", expirationTime=" + (expirationTime == null ? "(none)" : expirationTime.toString()) + + ", appMetadata=" + (appMetadata == null ? "(none)" : Base64.getEncoder().encodeToString(appMetadata)) + '}'; } + + /** + * Create a builder for FlightEndpoint. + * + * @param ticket A ticket that describe the key of a data stream. + * @param locations The possible locations the stream can be retrieved from. + */ + public static Builder builder(Ticket ticket, Location... locations) { + return new Builder(ticket, locations); + } + + /** + * Builder for FlightEndpoint. + */ + public static final class Builder { + private final Ticket ticket; + private final List locations; + private Instant expirationTime = null; + private byte[] appMetadata = null; + + private Builder(Ticket ticket, Location... locations) { + this.ticket = ticket; + this.locations = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(locations))); + } + + /** + * Set expiration time for the endpoint. Default is null, which means don't expire. + * + * @param expirationTime (optional) When this endpoint expires. + */ + public Builder setExpirationTime(Instant expirationTime) { + this.expirationTime = expirationTime; + return this; + } + + /** + * Set the app metadata to send along with the flight. Default is null; + * + * @param appMetadata Metadata to send along with the flight + */ + public Builder setAppMetadata(byte[] appMetadata) { + this.appMetadata = appMetadata; + return this; + } + + /** + * Build FlightEndpoint object. + */ + public FlightEndpoint build() { + return new FlightEndpoint(ticket, expirationTime, appMetadata, locations); + } + } } diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java index d871f89465c83..b5279a304c865 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java @@ -23,6 +23,8 @@ import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -51,6 +53,7 @@ public class FlightInfo { private final long records; private final boolean ordered; private final IpcOption option; + private final byte[] appMetadata; /** * Constructs a new instance. @@ -94,6 +97,23 @@ public FlightInfo(Schema schema, FlightDescriptor descriptor, List endpoints, long bytes, long records, boolean ordered, IpcOption option) { + this(schema, descriptor, endpoints, bytes, records, ordered, option, null); + } + + /** + * Constructs a new instance. + * + * @param schema The schema of the Flight + * @param descriptor An identifier for the Flight. + * @param endpoints A list of endpoints that have the flight available. + * @param bytes The number of bytes in the flight + * @param records The number of records in the flight. + * @param ordered Whether the endpoints in this flight are ordered. + * @param option IPC write options. + * @param appMetadata Metadata to send along with the flight + */ + public FlightInfo(Schema schema, FlightDescriptor descriptor, List endpoints, long bytes, + long records, boolean ordered, IpcOption option, byte[] appMetadata) { Objects.requireNonNull(descriptor); Objects.requireNonNull(endpoints); if (schema != null) { @@ -106,6 +126,7 @@ public FlightInfo(Schema schema, FlightDescriptor descriptor, List endpoints) { + return new Builder(schema, descriptor, endpoints); + } + + /** + * Builder for FlightInfo. + */ + public static final class Builder { + private final Schema schema; + private final FlightDescriptor descriptor; + private final List endpoints; + private long bytes = -1; + private long records = -1; + private boolean ordered = false; + private IpcOption option = IpcOption.DEFAULT; + private byte[] appMetadata = null; + + private Builder(Schema schema, FlightDescriptor descriptor, List endpoints) { + this.schema = schema; + this.descriptor = descriptor; + this.endpoints = endpoints; + } + + /** + * Set the number of bytes for the flight. Default to -1 for unknown. + * + * @param bytes The number of bytes in the flight + */ + public Builder setBytes(long bytes) { + this.bytes = bytes; + return this; + } + + /** + * Set the number of records for the flight. Default to -1 for unknown. + * + * @param records The number of records in the flight. + */ + public Builder setRecords(long records) { + this.records = records; + return this; + } + + /** + * Set whether the flight endpoints are ordered. Default is false. + * + * @param ordered Whether the endpoints in this flight are ordered. + */ + public Builder setOrdered(boolean ordered) { + this.ordered = ordered; + return this; + } + + /** + * Set IPC write options. Default is IpcOption.DEFAULT + * + * @param option IPC write options. + */ + public Builder setOption(IpcOption option) { + this.option = option; + return this; + } + + /** + * Set the app metadata to send along with the flight. Default is null. + * + * @param appMetadata Metadata to send along with the flight + */ + public Builder setAppMetadata(byte[] appMetadata) { + this.appMetadata = appMetadata; + return this; + } + + /** + * Build FlightInfo object. + */ + public FlightInfo build() { + return new FlightInfo(schema, descriptor, endpoints, bytes, records, ordered, option, appMetadata); + } + } } diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java index 238221f051a7e..41b3a4693e579 100644 --- a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java +++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java @@ -113,10 +113,13 @@ public void roundTripInfo() throws Exception { Field.nullable("a", new ArrowType.Int(32, true)), Field.nullable("b", new ArrowType.FixedSizeBinary(32)) ), metadata); - final FlightInfo info1 = new FlightInfo(schema, FlightDescriptor.path(), Collections.emptyList(), -1, -1); + final FlightInfo info1 = FlightInfo.builder(schema, FlightDescriptor.path(), Collections.emptyList()) + .setAppMetadata("foo".getBytes()).build(); final FlightInfo info2 = new FlightInfo(schema, FlightDescriptor.command(new byte[2]), - Collections.singletonList(new FlightEndpoint( - new Ticket(new byte[10]), Location.forGrpcDomainSocket("/tmp/test.sock"))), 200, 500); + Collections.singletonList( + FlightEndpoint.builder(new Ticket(new byte[10]), Location.forGrpcDomainSocket("/tmp/test.sock")) + .setAppMetadata("bar".getBytes()).build() + ), 200, 500); final FlightInfo info3 = new FlightInfo(schema, FlightDescriptor.path("a", "b"), Arrays.asList(new FlightEndpoint( new Ticket(new byte[10]), Location.forGrpcDomainSocket("/tmp/test.sock")), diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java index 691048fb03ed3..0e4669f29ce43 100644 --- a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java +++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestFlightService.java @@ -28,6 +28,7 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.ipc.message.IpcOption; import org.apache.arrow.vector.types.pojo.Schema; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -137,7 +138,8 @@ public void supportsNullSchemas() throws Exception @Override public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor) { - return new FlightInfo(null, descriptor, Collections.emptyList(), 0, 0); + return new FlightInfo(null, descriptor, Collections.emptyList(), + 0, 0, false, IpcOption.DEFAULT, "foo".getBytes()); } }; @@ -147,6 +149,7 @@ public FlightInfo getFlightInfo(CallContext context, FlightInfo flightInfo = client.getInfo(FlightDescriptor.path("test")); Assertions.assertEquals(Optional.empty(), flightInfo.getSchemaOptional()); Assertions.assertEquals(new Schema(Collections.emptyList()), flightInfo.getSchema()); + Assertions.assertArrayEquals(flightInfo.getAppMetadata(), "foo".getBytes()); Exception e = Assertions.assertThrows( FlightRuntimeException.class, diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/AppMetadataFlightInfoEndpointScenario.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/AppMetadataFlightInfoEndpointScenario.java new file mode 100644 index 0000000000000..3220bb5a2d20f --- /dev/null +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/AppMetadataFlightInfoEndpointScenario.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.integration.tests; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; + +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightProducer; +import org.apache.arrow.flight.FlightServer; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.NoOpFlightProducer; +import org.apache.arrow.flight.Ticket; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; + +/** Test app_metadata in FlightInfo and FlightEndpoint. */ +final class AppMetadataFlightInfoEndpointScenario implements Scenario { + @Override + public FlightProducer producer(BufferAllocator allocator, Location location) throws Exception { + return new AppMetadataFlightInfoEndpointProducer(); + } + + @Override + public void buildServer(FlightServer.Builder builder) throws Exception { + } + + @Override + public void client(BufferAllocator allocator, Location location, FlightClient client) throws Exception { + byte[] cmd = "foobar".getBytes(StandardCharsets.UTF_8); + FlightInfo info = client.getInfo(FlightDescriptor.command(cmd)); + IntegrationAssertions.assertEquals(info.getAppMetadata(), cmd); + IntegrationAssertions.assertEquals(info.getEndpoints().size(), 1); + IntegrationAssertions.assertEquals(info.getEndpoints().get(0).getAppMetadata(), cmd); + } + + /** producer for app_metadata test. */ + static class AppMetadataFlightInfoEndpointProducer extends NoOpFlightProducer { + @Override + public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor) { + byte[] cmd = descriptor.getCommand(); + + Schema schema = new Schema( + Collections.singletonList(Field.notNullable("number", Types.MinorType.UINT4.getType()))); + + List endpoints = Collections.singletonList( + FlightEndpoint.builder( + new Ticket("".getBytes(StandardCharsets.UTF_8))).setAppMetadata(cmd).build()); + + return FlightInfo.builder(schema, descriptor, endpoints).setAppMetadata(cmd).build(); + } + } +} + + diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java index 26629c650e30f..c61fd94a4d228 100644 --- a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/Scenarios.java @@ -49,6 +49,7 @@ private Scenarios() { scenarios.put("poll_flight_info", PollFlightInfoScenario::new); scenarios.put("flight_sql", FlightSqlScenario::new); scenarios.put("flight_sql:extension", FlightSqlExtensionScenario::new); + scenarios.put("app_metadata_flight_info_endpoint", AppMetadataFlightInfoEndpointScenario::new); } private static Scenarios getInstance() { diff --git a/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java b/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java index cf65e16fac06f..477a56055cbbc 100644 --- a/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java +++ b/java/flight/flight-integration-tests/src/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java @@ -78,6 +78,11 @@ void flightSqlExtension() throws Exception { testScenario("flight_sql:extension"); } + @Test + void appMetadataFlightInfoEndpoint() throws Exception { + testScenario("app_metadata_flight_info_endpoint"); + } + void testScenario(String scenarioName) throws Exception { try (final BufferAllocator allocator = new RootAllocator()) { final FlightServer.Builder builder = FlightServer.builder()