Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add row-based access of results to connection #18

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ properties.put("clientSecret", "${clientSecret}");

The documentation for jwt authentication can be found [here][jwt flow].

Instuctions to generate a private key can be found [here](#generating-a-private-key-for-jwt-authentication)
Instructions to generate a private key can be found [here](#generating-a-private-key-for-jwt-authentication)

```java
Properties properties = new Properties();
Expand Down
5 changes: 3 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.salesforce.datacloud</groupId>
<artifactId>jdbc</artifactId>
<version>0.23.0-SNAPSHOT</version>
<version>0.24.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Salesforce Data Cloud JDBC Driver</name>
<description>Salesforce Data Cloud JDBC Driver</description>
Expand Down Expand Up @@ -416,8 +416,9 @@
<sourcepath>${project.build.directory}/delombok;${project.build.directory}/generated-sources/protobuf</sourcepath>
<source>${java.version}</source>
<quiet>true</quiet>
<excludePackageNames>com.salesforce.hyperdb.grpc</excludePackageNames>
<excludePackageNames>salesforce.cdp.hyperdb.v1.*</excludePackageNames>
<outputDirectory>${project.build.directory}/apidocs</outputDirectory>
<failOnWarnings>false</failOnWarnings>
</configuration>
<executions>
<execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import com.salesforce.datacloud.jdbc.auth.AuthenticationSettings;
import com.salesforce.datacloud.jdbc.auth.DataCloudTokenProcessor;
import com.salesforce.datacloud.jdbc.auth.TokenProcessor;
import com.salesforce.datacloud.jdbc.core.partial.RowBased;
import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
import com.salesforce.datacloud.jdbc.http.ClientBuilder;
import com.salesforce.datacloud.jdbc.interceptor.AuthorizationHeaderInterceptor;
import com.salesforce.datacloud.jdbc.interceptor.DataspaceHeaderInterceptor;
import com.salesforce.datacloud.jdbc.interceptor.HyperExternalClientContextHeaderInterceptor;
import com.salesforce.datacloud.jdbc.interceptor.HyperWorkloadHeaderInterceptor;
import com.salesforce.datacloud.jdbc.interceptor.TracingHeadersInterceptor;
import com.salesforce.datacloud.jdbc.util.Unstable;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannelBuilder;
import java.sql.Array;
Expand Down Expand Up @@ -173,6 +175,24 @@ private DataCloudPreparedStatement getQueryPreparedStatement(String sql) {
return new DataCloudPreparedStatement(this, sql, new DefaultParameterManager());
}

/**
* Use getQueryStatus to determine if your query is "ready" then use this to get a collection of rows.
* When using {@link RowBased.Mode#FULL_RANGE} this method is not responsible for calculating the offset near the end of available rows,
* you must calculate the correct "pages" of offset and limit.
*/
public DataCloudResultSet getRowBasedResultSet(String queryId, long offset, long limit, RowBased.Mode mode) {
val iterator = RowBased.of(executor, queryId, offset, limit, mode);
return StreamingResultSet.of(queryId, executor, iterator);
}

/**
* Use this to determine when a given query is complete by filtering the responses and a subsequent findFirst()
*/
@Unstable
public Stream<DataCloudQueryStatus> getQueryStatus(String queryId) {
return executor.getQueryStatus(queryId);
}

@Override
public CallableStatement prepareCall(String sql) {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2024, Salesforce, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.salesforce.datacloud.jdbc.core;

import java.util.Optional;
import lombok.Value;
import lombok.val;
import salesforce.cdp.hyperdb.v1.QueryInfo;
import salesforce.cdp.hyperdb.v1.QueryStatus;

@Value
public class DataCloudQueryStatus {
public enum CompletionStatus {
RUNNING,
RESULTS_PRODUCED,
FINISHED
}

String queryId;

long chunkCount;

long rowCount;

double progress;

CompletionStatus completionStatus;

public boolean isResultsProduced() {
return completionStatus == CompletionStatus.RESULTS_PRODUCED;
}

public boolean isExecutionFinished() {
return completionStatus == CompletionStatus.FINISHED;
}

static Optional<DataCloudQueryStatus> of(QueryInfo queryInfo) {
return Optional.ofNullable(queryInfo).map(QueryInfo::getQueryStatus).map(DataCloudQueryStatus::of);
}

private static DataCloudQueryStatus of(QueryStatus s) {
val completionStatus = of(s.getCompletionStatus());
return new DataCloudQueryStatus(
s.getQueryId(), s.getChunkCount(), s.getRowCount(), s.getProgress(), completionStatus);
}

private static CompletionStatus of(QueryStatus.CompletionStatus completionStatus) {
switch (completionStatus) {
case RUNNING_OR_UNSPECIFIED:
return CompletionStatus.RUNNING;
case RESULTS_PRODUCED:
return CompletionStatus.RESULTS_PRODUCED;
case FINISHED:
return CompletionStatus.FINISHED;
default:
throw new IllegalArgumentException("Unknown completion status. status=" + completionStatus);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
import com.salesforce.datacloud.jdbc.util.Constants;
import com.salesforce.datacloud.jdbc.util.SqlErrorCodes;
import com.salesforce.datacloud.jdbc.util.Unstable;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -73,16 +74,27 @@ protected HyperGrpcClientExecutor getQueryExecutor(QueryParam additionalQueryPar
return clientBuilder.queryTimeout(getQueryTimeout()).build();
}

private void assertQueryReady() throws SQLException {
private void assertQueryExecuted() throws SQLException {
if (listener == null) {
throw new DataCloudJDBCException("a query was not executed before attempting to access results");
}
}

private void assertQueryReady() throws SQLException {
assertQueryExecuted();

if (!listener.isReady()) {
throw new DataCloudJDBCException("query results were not ready");
}
}

@Unstable
public String getQueryId() throws SQLException {
assertQueryExecuted();

return listener.getQueryId();
}

public boolean isReady() {
return listener.isReady();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.salesforce.datacloud.jdbc.config.DriverVersion;
import com.salesforce.datacloud.jdbc.interceptor.QueryIdHeaderInterceptor;
import com.salesforce.datacloud.jdbc.util.PropertiesExtensions;
import com.salesforce.datacloud.jdbc.util.StreamUtilities;
import com.salesforce.datacloud.jdbc.util.Unstable;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
Expand All @@ -29,8 +31,10 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.Getter;
Expand All @@ -45,6 +49,7 @@
import salesforce.cdp.hyperdb.v1.QueryParam;
import salesforce.cdp.hyperdb.v1.QueryResult;
import salesforce.cdp.hyperdb.v1.QueryResultParam;
import salesforce.cdp.hyperdb.v1.ResultRange;

@Slf4j
@Builder(toBuilder = true)
Expand Down Expand Up @@ -135,6 +140,29 @@ public Iterator<QueryInfo> getQueryInfoStreaming(String queryId) {
return getStub(queryId).getQueryInfo(param);
}

@Unstable
public Stream<DataCloudQueryStatus> getQueryStatus(String queryId) {
val iterator = getQueryInfo(queryId);
return StreamUtilities.toStream(iterator)
.map(DataCloudQueryStatus::of)
.filter(Optional::isPresent)
.map(Optional::get);
}

public Iterator<QueryResult> getQueryResult(String queryId, long offset, long limit, boolean omitSchema) {
val rowRange =
ResultRange.newBuilder().setRowOffset(offset).setRowLimit(limit).setByteLimit(1024);

final QueryResultParam param = QueryResultParam.newBuilder()
.setQueryId(queryId)
.setResultRange(rowRange)
.setOmitSchema(omitSchema)
.setOutputFormat(OutputFormat.ARROW_IPC)
.build();

return getStub(queryId).getQueryResult(param);
}

public Iterator<QueryResult> getQueryResult(String queryId, long chunkId, boolean omitSchema) {
val param = getQueryResultParam(queryId, chunkId, omitSchema);
return getStub(queryId).getQueryResult(param);
Expand All @@ -161,12 +189,9 @@ private QueryResultParam getQueryResultParam(String queryId, long chunkId, boole
val builder = QueryResultParam.newBuilder()
.setQueryId(queryId)
.setChunkId(chunkId)
.setOmitSchema(omitSchema)
.setOutputFormat(OutputFormat.ARROW_IPC);

if (omitSchema) {
builder.setOmitSchema(true);
}

return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@
import com.salesforce.datacloud.jdbc.core.listener.QueryStatusListener;
import com.salesforce.datacloud.jdbc.exception.QueryExceptionHandler;
import com.salesforce.datacloud.jdbc.util.ArrowUtils;
import com.salesforce.datacloud.jdbc.util.StreamUtilities;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Iterator;
import java.util.TimeZone;
import java.util.stream.Stream;
import lombok.SneakyThrows;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.arrow.memory.RootAllocator;
Expand All @@ -32,6 +36,7 @@
import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.calcite.avatica.Meta;
import org.apache.calcite.avatica.QueryState;
import salesforce.cdp.hyperdb.v1.QueryResult;

@Slf4j
public class StreamingResultSet extends AvaticaResultSet implements DataCloudResultSet {
Expand All @@ -51,6 +56,7 @@ private StreamingResultSet(
this.listener = listener;
}

@Deprecated
@SneakyThrows
public static StreamingResultSet of(String sql, QueryStatusListener listener) {
try {
Expand All @@ -73,6 +79,30 @@ public static StreamingResultSet of(String sql, QueryStatusListener listener) {
}
}

@SneakyThrows
public static StreamingResultSet of(
String queryId, HyperGrpcClientExecutor client, Iterator<QueryResult> iterator) {
try {
val channel = ExecuteQueryResponseChannel.of(StreamUtilities.toStream(iterator));
val reader = new ArrowStreamReader(channel, new RootAllocator(ROOT_ALLOCATOR_MB_FROM_V2));
val schemaRoot = reader.getVectorSchemaRoot();
val columns = ArrowUtils.toColumnMetaData(schemaRoot.getSchema().getFields());
val timezone = TimeZone.getDefault();
val state = new QueryState();
val signature = new Meta.Signature(
columns, null, Collections.emptyList(), Collections.emptyMap(), null, Meta.StatementType.SELECT);
val metadata = new AvaticaResultSetMetaData(null, null, signature);
val listener = new AlreadyReadyNoopListener(queryId);
val result = new StreamingResultSet(listener, null, state, signature, metadata, timezone, null);
val cursor = new ArrowStreamReaderCursor(reader);
result.execute2(cursor, columns);

return result;
} catch (Exception ex) {
throw QueryExceptionHandler.createException(QUERY_FAILURE + queryId, ex);
}
}

@Override
public String getQueryId() {
return listener.getQueryId();
Expand All @@ -89,4 +119,22 @@ public boolean isReady() {
}

private static final String QUERY_FAILURE = "Failed to execute query: ";

@Value
private static class AlreadyReadyNoopListener implements QueryStatusListener {
String queryId;
String status = "Status should be determined via DataCloudConnection::getStatus";
String query = null;
boolean ready = true;

@Override
public DataCloudResultSet generateResultSet() {
return null;
}

@Override
public Stream<QueryResult> stream() {
return Stream.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.stream.Stream;
import salesforce.cdp.hyperdb.v1.QueryResult;

@Deprecated
public interface QueryStatusListener {
String BEFORE_READY = "Results were requested before ready";

Expand Down
Loading
Loading