Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BT-5120-pagination-ergonomics] Implement queryPage and asyncQueryPage APIs.. #149

Merged
merged 1 commit into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 64 additions & 58 deletions src/main/java/com/fauna/client/FaunaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
import com.fauna.exception.ClientException;
import com.fauna.exception.FaunaException;
import com.fauna.exception.ServiceException;
import com.fauna.query.AfterToken;
import com.fauna.query.QueryOptions;
import com.fauna.stream.StreamRequest;
import com.fauna.query.StreamTokenResponse;
import com.fauna.query.builder.Query;
import com.fauna.response.QueryResponse;
import com.fauna.response.QuerySuccess;
import com.fauna.codec.ParameterizedOf;
import com.fauna.types.Page;

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
Expand All @@ -25,6 +27,14 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import static com.fauna.client.PageIterator.PAGINATE_QUERY;
import static com.fauna.client.PageIterator.TOKEN_NAME;
import static com.fauna.codec.Generic.pageOf;
import static com.fauna.constants.ErrorMessages.QUERY_EXECUTION;
import static com.fauna.constants.ErrorMessages.QUERY_PAGE;
import static com.fauna.constants.ErrorMessages.STREAM_SUBSCRIPTION;
import static com.fauna.query.builder.Query.fql;

public abstract class FaunaClient {

public static final RetryStrategy DEFAULT_RETRY_STRATEGY = ExponentialBackoffStrategy.builder().build();
Expand Down Expand Up @@ -52,7 +62,9 @@ public Optional<Long> getLastTransactionTs() {
}

private static Optional<ServiceException> extractServiceException(Throwable throwable) {
if (throwable.getCause() instanceof ServiceException) {
if (throwable instanceof ServiceException) {
return Optional.of((ServiceException) throwable);
} else if (throwable.getCause() instanceof ServiceException) {
return Optional.of((ServiceException) throwable.getCause());
} else {
return Optional.empty();
Expand All @@ -78,6 +90,18 @@ private <T> Supplier<CompletableFuture<QuerySuccess<T>>> makeAsyncRequest(HttpCl
return () -> client.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream()).thenApply(body -> QueryResponse.parseResponse(body, codec)).whenComplete(this::completeRequest);
}

private <R> R completeAsync(CompletableFuture<R> future, String executionMessage) {
try {
return future.get();
} catch(ExecutionException | InterruptedException exc) {
if (exc.getCause() != null && exc.getCause() instanceof FaunaException) {
throw (FaunaException) exc.getCause();
} else {
throw new ClientException(executionMessage, exc);
}
}
}


//region Asynchronous API
/**
Expand Down Expand Up @@ -196,15 +220,7 @@ public <E> CompletableFuture<QuerySuccess<E>> asyncQuery(Query fql, Parameterize
* @throws FaunaException If the query does not succeed, an exception will be thrown.
*/
public QuerySuccess<Object> query(Query fql) throws FaunaException {
try {
return this.asyncQuery(fql, Object.class, null).get();
} catch (InterruptedException | ExecutionException e) {
if (e.getCause() instanceof FaunaException) {
throw (FaunaException) e.getCause();
} else {
throw new ClientException("Unhandled exception.", e);
}
}
return completeAsync(asyncQuery(fql, Object.class, null), "Unable to execute query.");
}

/**
Expand All @@ -219,15 +235,7 @@ public QuerySuccess<Object> query(Query fql) throws FaunaException {
* @throws FaunaException If the query does not succeed, an exception will be thrown.
*/
public <T> QuerySuccess<T> query(Query fql, Class<T> resultClass) throws FaunaException {
try {
return this.asyncQuery(fql, resultClass, null).get();
} catch (InterruptedException | ExecutionException e) {
if (e.getCause() instanceof FaunaException) {
throw (FaunaException) e.getCause();
} else {
throw new ClientException("Unhandled exception.", e);
}
}
return completeAsync(asyncQuery(fql, resultClass, null), QUERY_EXECUTION);
}

/**
Expand All @@ -242,15 +250,7 @@ public <T> QuerySuccess<T> query(Query fql, Class<T> resultClass) throws FaunaEx
* @throws FaunaException If the query does not succeed, an exception will be thrown.
*/
public <E> QuerySuccess<E> query(Query fql, ParameterizedOf<E> parameterizedType) throws FaunaException {
try {
return this.asyncQuery(fql, parameterizedType, null).get();
} catch (InterruptedException | ExecutionException e) {
if (e.getCause() instanceof FaunaException) {
throw (FaunaException) e.getCause();
} else {
throw new ClientException("Unhandled exception.", e);
}
}
return completeAsync(asyncQuery(fql, parameterizedType), QUERY_EXECUTION);
}

/**
Expand All @@ -266,15 +266,7 @@ public <E> QuerySuccess<E> query(Query fql, ParameterizedOf<E> parameterizedType
* @throws FaunaException If the query does not succeed, an exception will be thrown.
*/
public <T> QuerySuccess<T> query(Query fql, Class<T> resultClass, QueryOptions options) throws FaunaException {
try {
return this.asyncQuery(fql, resultClass, options).get();
} catch (InterruptedException | ExecutionException e) {
if (e.getCause() instanceof FaunaException) {
throw (FaunaException) e.getCause();
} else {
throw new ClientException("Unhandled exception.", e);
}
}
return completeAsync(asyncQuery(fql, resultClass, options), QUERY_EXECUTION);
}

/**
Expand All @@ -290,15 +282,38 @@ public <T> QuerySuccess<T> query(Query fql, Class<T> resultClass, QueryOptions o
* @throws FaunaException If the query does not succeed, an exception will be thrown.
*/
public <E> QuerySuccess<E> query(Query fql, ParameterizedOf<E> parameterizedType, QueryOptions options) throws FaunaException {
try {
return this.asyncQuery(fql, parameterizedType, options).get();
} catch (InterruptedException | ExecutionException e) {
if (e.getCause() instanceof FaunaException) {
throw (FaunaException) e.getCause();
} else {
throw new ClientException("Unhandled exception.", e);
}
}
return completeAsync(asyncQuery(fql, parameterizedType, options), QUERY_EXECUTION);
}
//endregion

//region Query Page API

/**
* Sends a query to Fauna that retrieves the Page<E> for the given page token.
* @param after The page token (result of a previous paginated request).
* @param elementClass The expected class of the query result.
* @param options A (nullable) set of options to pass to the query.
* @return A CompletableFuture that returns a QuerySuccess with data of type Page<E>.
* @throws FaunaException If the query does not succeed, an exception will be thrown.
* @param <E> The type of the elements of the page.
*/
public <E> CompletableFuture<QuerySuccess<Page<E>>> asyncQueryPage(
AfterToken after, Class<E> elementClass, QueryOptions options) {
return this.asyncQuery(PageIterator.buildPageQuery(after), pageOf(elementClass), options);
}

/**
* Sends a query to Fauna that retrieves the Page<E> for the given page token.
* @param after The page token (result of a previous paginated request).
* @param elementClass The expected class of the query result.
* @param options A (nullable) set of options to pass to the query.
* @return A QuerySuccess with data of type Page<E>.
* @throws FaunaException If the query does not succeed, an exception will be thrown.
* @param <E> The type of the elements of the page.
*/
public <E> QuerySuccess<Page<E>> queryPage(
AfterToken after, Class<E> elementClass, QueryOptions options) {
return completeAsync(asyncQueryPage(after, elementClass, options), QUERY_PAGE);
}
//endregion

Expand All @@ -310,6 +325,7 @@ public <E> QuerySuccess<E> query(Query fql, ParameterizedOf<E> parameterizedType
* @param options A (nullable) set of options to pass to the query.
* @return QuerySuccess The successful query result.
* @throws FaunaException If the query does not succeed, an exception will be thrown.
* @param <E> The type of the elements of the page.
*/
public <E> PageIterator<E> paginate(Query fql, Class<E> elementClass, QueryOptions options) {
return new PageIterator<>(this, fql, elementClass, options);
Expand Down Expand Up @@ -379,12 +395,7 @@ public <E> CompletableFuture<FaunaStream<E>> asyncStream(StreamRequest streamReq
* @throws FaunaException If the query does not succeed, an exception will be thrown.
*/
public <E> FaunaStream<E> stream(StreamRequest streamRequest, Class<E> elementClass) {
try {
return this.asyncStream(streamRequest, elementClass).get();
} catch (InterruptedException | ExecutionException e) {
throw new ClientException("Unable to subscribe to stream.", e);
}

return completeAsync(asyncStream(streamRequest, elementClass), STREAM_SUBSCRIPTION);
}

/**
Expand Down Expand Up @@ -418,12 +429,7 @@ public <E> CompletableFuture<FaunaStream<E>> asyncStream(Query fql, Class<E> ele
* @throws FaunaException If the query does not succeed, an exception will be thrown.
*/
public <E> FaunaStream<E> stream(Query fql, Class<E> elementClass) {
try {
return this.asyncStream(fql, elementClass).get();
} catch (InterruptedException | ExecutionException e) {
throw new ClientException("Unable to subscribe to stream.", e);
}

return completeAsync(asyncStream(fql, elementClass), STREAM_SUBSCRIPTION);
}
//endregion
}
12 changes: 8 additions & 4 deletions src/main/java/com/fauna/client/PageIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import com.fauna.exception.FaunaException;
import com.fauna.query.AfterToken;
import com.fauna.query.QueryOptions;
import com.fauna.query.builder.Query;
import com.fauna.response.QuerySuccess;
Expand All @@ -21,8 +22,8 @@
* @param <E>
*/
public class PageIterator<E> implements Iterator<Page<E>> {
private static final String TOKEN_NAME = "token";
private static final String PAGINATE_QUERY = "Set.paginate(${" + TOKEN_NAME + "})";
static final String TOKEN_NAME = "token";
static final String PAGINATE_QUERY = "Set.paginate(${" + TOKEN_NAME + "})";
private final FaunaClient client;
private final QueryOptions options;
private final PageOf<E> pageClass;
Expand All @@ -48,8 +49,11 @@ public boolean hasNext() {
return this.queryFuture != null;
}

private void doPaginatedQuery(String afterToken) {
this.queryFuture = client.asyncQuery(fql(PAGINATE_QUERY, Map.of(TOKEN_NAME, afterToken)), pageClass, options);
public static Query buildPageQuery(AfterToken afterToken) {
return fql(PAGINATE_QUERY, Map.of(TOKEN_NAME, afterToken.getToken()));
}
private void doPaginatedQuery(AfterToken afterToken) {
this.queryFuture = client.asyncQuery(PageIterator.buildPageQuery(afterToken), pageClass, options);
}

private void endPagination() {
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/com/fauna/constants/ErrorMessages.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.fauna.constants;

public class ErrorMessages {
public static final String QUERY_EXECUTION = "Unable to execute query.";
public static final String STREAM_SUBSCRIPTION = "Unable to subscribe to stream.";
public static final String QUERY_PAGE = "Unable to query page.";

}
2 changes: 2 additions & 0 deletions src/main/java/com/fauna/exception/ErrorHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.fauna.response.QueryFailure;


import java.util.concurrent.ExecutionException;

import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static java.net.HttpURLConnection.HTTP_FORBIDDEN;
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/com/fauna/query/AfterToken.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.fauna.query;

import java.util.Optional;

public class AfterToken {
private final String token;

public AfterToken(String token) {
this.token = token;
}

public String getToken() {
return token;
}

public static Optional<AfterToken> fromString(String token) {
return Optional.ofNullable(token != null ? new AfterToken(token) : null);
}
}
6 changes: 4 additions & 2 deletions src/main/java/com/fauna/types/Page.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.fauna.types;

import com.fauna.query.AfterToken;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -22,8 +24,8 @@ public List<T> getData() {
return data;
}

public Optional<String> getAfter() {
return Optional.ofNullable(after);
public Optional<AfterToken> getAfter() {
return AfterToken.fromString(after);
}

@Override
Expand Down
14 changes: 7 additions & 7 deletions src/test/java/com/fauna/e2e/E2EPaginationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;

import static com.fauna.query.builder.Query.fql;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -74,17 +72,19 @@ public void query_single_page_gets_wrapped_in_page() {

@Test
public void query_all_with_manual_pagination() {
// Demonstrate how a user could paginate without the paginate API.
// Demonstrate how a user could paginate without PageIterator.
PageOf<Product> pageOf = new PageOf<>(Product.class);
QuerySuccess<Page<Product>> first = client.query(fql("Product.all()"), pageOf);
Page<Product> latest = first.getData();
List<List<Product>> pages = new ArrayList<>();

pages.add(latest.getData());
while (latest.getAfter().isPresent()) {
QuerySuccess<Page<Product>> paged = client.query(fql("Set.paginate(${after})", Map.of("after", latest.getAfter())), pageOf);
latest = paged.getData();
pages.add(latest.getData());
while (latest != null) {
latest = latest.getAfter().map(after -> {
Page<Product> page = client.queryPage(after, Product.class, null).getData();
pages.add(page.getData());
return page;
}).orElse(null);
}
assertEquals(4, pages.size());
assertEquals(2, pages.get(3).size());
Expand Down
Loading