Skip to content

Commit

Permalink
feat: add row-based access of results to connection
Browse files Browse the repository at this point in the history
  • Loading branch information
jschneidereit committed Feb 12, 2025
1 parent a1e5d3c commit 082595e
Show file tree
Hide file tree
Showing 13 changed files with 701 additions and 8 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ properties.put("clientSecret", "${clientSecret}");

The documentation for jwt authentication can be found [here][jwt flow].

Instuctions to generate a private key can be found [here](#generating-a-private-key-for-jwt-authentication)
Instructions to generate a private key can be found [here](#generating-a-private-key-for-jwt-authentication)

```java
Properties properties = new Properties();
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.salesforce.datacloud</groupId>
<artifactId>jdbc</artifactId>
<version>0.23.0-SNAPSHOT</version>
<version>0.24.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Salesforce Data Cloud JDBC Driver</name>
<description>Salesforce Data Cloud JDBC Driver</description>
Expand Down Expand Up @@ -416,7 +416,7 @@
<sourcepath>${project.build.directory}/delombok;${project.build.directory}/generated-sources/protobuf</sourcepath>
<source>${java.version}</source>
<quiet>true</quiet>
<excludePackageNames>com.salesforce.hyperdb.grpc</excludePackageNames>
<excludePackageNames>salesforce.cdp.hyperdb.v1</excludePackageNames>
<outputDirectory>${project.build.directory}/apidocs</outputDirectory>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import com.salesforce.datacloud.jdbc.auth.AuthenticationSettings;
import com.salesforce.datacloud.jdbc.auth.DataCloudTokenProcessor;
import com.salesforce.datacloud.jdbc.auth.TokenProcessor;
import com.salesforce.datacloud.jdbc.core.partial.RowBased;
import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
import com.salesforce.datacloud.jdbc.http.ClientBuilder;
import com.salesforce.datacloud.jdbc.interceptor.AuthorizationHeaderInterceptor;
import com.salesforce.datacloud.jdbc.interceptor.DataspaceHeaderInterceptor;
import com.salesforce.datacloud.jdbc.interceptor.HyperExternalClientContextHeaderInterceptor;
import com.salesforce.datacloud.jdbc.interceptor.HyperWorkloadHeaderInterceptor;
import com.salesforce.datacloud.jdbc.interceptor.TracingHeadersInterceptor;
import com.salesforce.datacloud.jdbc.util.Unstable;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannelBuilder;
import java.sql.Array;
Expand Down Expand Up @@ -173,6 +175,24 @@ private DataCloudPreparedStatement getQueryPreparedStatement(String sql) {
return new DataCloudPreparedStatement(this, sql, new DefaultParameterManager());
}

/**
* Use getQueryStatus to determine if your query is "ready" then use this to get a collection of rows.
* When using {@link RowBased.Mode#FULL_RANGE} this method is not responsible for calculating the offset near the end of available rows,
* you must calculate the correct "pages" of offset and limit.
*/
public DataCloudResultSet getRowBasedResultSet(String queryId, long offset, long limit, RowBased.Mode mode) {
val iterator = RowBased.of(executor, queryId, offset, limit, mode);
return StreamingResultSet.of(queryId, executor, iterator);
}

/**
* Use this to determine when a given query is complete by filtering the responses and a subsequent findFirst()
*/
@Unstable
public Stream<DataCloudQueryStatus> getQueryStatus(String queryId) {
return executor.getQueryStatus(queryId);
}

@Override
public CallableStatement prepareCall(String sql) {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2024, Salesforce, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.salesforce.datacloud.jdbc.core;

import java.util.Optional;
import lombok.Value;
import lombok.val;
import salesforce.cdp.hyperdb.v1.QueryInfo;
import salesforce.cdp.hyperdb.v1.QueryStatus;

@Value
public class DataCloudQueryStatus {
public enum CompletionStatus {
RUNNING,
RESULTS_PRODUCED,
FINISHED
}

String queryId;

long chunkCount;

long rowCount;

double progress;

CompletionStatus completionStatus;

public boolean isResultsProduced() {
return completionStatus == CompletionStatus.RESULTS_PRODUCED;
}

public boolean isExecutionFinished() {
return completionStatus == CompletionStatus.FINISHED;
}

static Optional<DataCloudQueryStatus> of(QueryInfo queryInfo) {
return Optional.ofNullable(queryInfo).map(QueryInfo::getQueryStatus).map(DataCloudQueryStatus::of);
}

private static DataCloudQueryStatus of(QueryStatus s) {
val completionStatus = of(s.getCompletionStatus());
return new DataCloudQueryStatus(
s.getQueryId(), s.getChunkCount(), s.getRowCount(), s.getProgress(), completionStatus);
}

private static CompletionStatus of(QueryStatus.CompletionStatus completionStatus) {
switch (completionStatus) {
case RUNNING_OR_UNSPECIFIED:
return CompletionStatus.RUNNING;
case RESULTS_PRODUCED:
return CompletionStatus.RESULTS_PRODUCED;
case FINISHED:
return CompletionStatus.FINISHED;
default:
throw new IllegalArgumentException("Unknown completion status. status=" + completionStatus);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
import com.salesforce.datacloud.jdbc.util.Constants;
import com.salesforce.datacloud.jdbc.util.SqlErrorCodes;
import com.salesforce.datacloud.jdbc.util.Unstable;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -73,16 +74,27 @@ protected HyperGrpcClientExecutor getQueryExecutor(QueryParam additionalQueryPar
return clientBuilder.queryTimeout(getQueryTimeout()).build();
}

private void assertQueryReady() throws SQLException {
private void assertQueryExecuted() throws SQLException {
if (listener == null) {
throw new DataCloudJDBCException("a query was not executed before attempting to access results");
}
}

private void assertQueryReady() throws SQLException {
assertQueryExecuted();

if (!listener.isReady()) {
throw new DataCloudJDBCException("query results were not ready");
}
}

@Unstable
public String getQueryId() throws SQLException {
assertQueryExecuted();

return listener.getQueryId();
}

public boolean isReady() {
return listener.isReady();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.salesforce.datacloud.jdbc.config.DriverVersion;
import com.salesforce.datacloud.jdbc.interceptor.QueryIdHeaderInterceptor;
import com.salesforce.datacloud.jdbc.util.PropertiesExtensions;
import com.salesforce.datacloud.jdbc.util.StreamUtilities;
import com.salesforce.datacloud.jdbc.util.Unstable;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
Expand All @@ -29,8 +31,10 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.Getter;
Expand All @@ -45,6 +49,7 @@
import salesforce.cdp.hyperdb.v1.QueryParam;
import salesforce.cdp.hyperdb.v1.QueryResult;
import salesforce.cdp.hyperdb.v1.QueryResultParam;
import salesforce.cdp.hyperdb.v1.ResultRange;

@Slf4j
@Builder(toBuilder = true)
Expand Down Expand Up @@ -135,6 +140,29 @@ public Iterator<QueryInfo> getQueryInfoStreaming(String queryId) {
return getStub(queryId).getQueryInfo(param);
}

@Unstable
public Stream<DataCloudQueryStatus> getQueryStatus(String queryId) {
val iterator = getQueryInfo(queryId);
return StreamUtilities.toStream(iterator)
.map(DataCloudQueryStatus::of)
.filter(Optional::isPresent)
.map(Optional::get);
}

public Iterator<QueryResult> getQueryResult(String queryId, long offset, long limit, boolean omitSchema) {
val rowRange =
ResultRange.newBuilder().setRowOffset(offset).setRowLimit(limit).setByteLimit(1024);

final QueryResultParam param = QueryResultParam.newBuilder()
.setQueryId(queryId)
.setResultRange(rowRange)
.setOmitSchema(omitSchema)
.setOutputFormat(OutputFormat.ARROW_IPC)
.build();

return getStub(queryId).getQueryResult(param);
}

public Iterator<QueryResult> getQueryResult(String queryId, long chunkId, boolean omitSchema) {
val param = getQueryResultParam(queryId, chunkId, omitSchema);
return getStub(queryId).getQueryResult(param);
Expand All @@ -161,12 +189,9 @@ private QueryResultParam getQueryResultParam(String queryId, long chunkId, boole
val builder = QueryResultParam.newBuilder()
.setQueryId(queryId)
.setChunkId(chunkId)
.setOmitSchema(omitSchema)
.setOutputFormat(OutputFormat.ARROW_IPC);

if (omitSchema) {
builder.setOmitSchema(true);
}

return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@
import com.salesforce.datacloud.jdbc.core.listener.QueryStatusListener;
import com.salesforce.datacloud.jdbc.exception.QueryExceptionHandler;
import com.salesforce.datacloud.jdbc.util.ArrowUtils;
import com.salesforce.datacloud.jdbc.util.StreamUtilities;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Iterator;
import java.util.TimeZone;
import java.util.stream.Stream;
import lombok.SneakyThrows;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.arrow.memory.RootAllocator;
Expand All @@ -32,6 +36,7 @@
import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.calcite.avatica.Meta;
import org.apache.calcite.avatica.QueryState;
import salesforce.cdp.hyperdb.v1.QueryResult;

@Slf4j
public class StreamingResultSet extends AvaticaResultSet implements DataCloudResultSet {
Expand All @@ -51,6 +56,7 @@ private StreamingResultSet(
this.listener = listener;
}

@Deprecated
@SneakyThrows
public static StreamingResultSet of(String sql, QueryStatusListener listener) {
try {
Expand All @@ -73,6 +79,30 @@ public static StreamingResultSet of(String sql, QueryStatusListener listener) {
}
}

@SneakyThrows
public static StreamingResultSet of(
String queryId, HyperGrpcClientExecutor client, Iterator<QueryResult> iterator) {
try {
val channel = ExecuteQueryResponseChannel.of(StreamUtilities.toStream(iterator));
val reader = new ArrowStreamReader(channel, new RootAllocator(ROOT_ALLOCATOR_MB_FROM_V2));
val schemaRoot = reader.getVectorSchemaRoot();
val columns = ArrowUtils.toColumnMetaData(schemaRoot.getSchema().getFields());
val timezone = TimeZone.getDefault();
val state = new QueryState();
val signature = new Meta.Signature(
columns, null, Collections.emptyList(), Collections.emptyMap(), null, Meta.StatementType.SELECT);
val metadata = new AvaticaResultSetMetaData(null, null, signature);
val listener = new AlreadyReadyNoopListener(queryId);
val result = new StreamingResultSet(listener, null, state, signature, metadata, timezone, null);
val cursor = new ArrowStreamReaderCursor(reader);
result.execute2(cursor, columns);

return result;
} catch (Exception ex) {
throw QueryExceptionHandler.createException(QUERY_FAILURE + queryId, ex);
}
}

@Override
public String getQueryId() {
return listener.getQueryId();
Expand All @@ -89,4 +119,22 @@ public boolean isReady() {
}

private static final String QUERY_FAILURE = "Failed to execute query: ";

@Value
private static class AlreadyReadyNoopListener implements QueryStatusListener {
String queryId;
String status = "Status should be determined via DataCloudConnection::getStatus";
String query = null;
boolean ready = true;

@Override
public DataCloudResultSet generateResultSet() {
return null;
}

@Override
public Stream<QueryResult> stream() {
return Stream.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.stream.Stream;
import salesforce.cdp.hyperdb.v1.QueryResult;

@Deprecated
public interface QueryStatusListener {
String BEFORE_READY = "Results were requested before ready";

Expand Down
Loading

0 comments on commit 082595e

Please sign in to comment.