Skip to content

Commit

Permalink
Implement queryPage and asyncQueryPage APIs, along with some refactor…
Browse files Browse the repository at this point in the history
…ing and modified test.
  • Loading branch information
David Griffin committed Sep 23, 2024
1 parent 605384a commit d8d7b9f
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 71 deletions.
122 changes: 64 additions & 58 deletions src/main/java/com/fauna/client/FaunaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
import com.fauna.exception.ClientException;
import com.fauna.exception.FaunaException;
import com.fauna.exception.ServiceException;
import com.fauna.query.AfterToken;
import com.fauna.query.QueryOptions;
import com.fauna.stream.StreamRequest;
import com.fauna.query.StreamTokenResponse;
import com.fauna.query.builder.Query;
import com.fauna.response.QueryResponse;
import com.fauna.response.QuerySuccess;
import com.fauna.codec.ParameterizedOf;
import com.fauna.types.Page;

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
Expand All @@ -25,6 +27,14 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import static com.fauna.client.PageIterator.PAGINATE_QUERY;
import static com.fauna.client.PageIterator.TOKEN_NAME;
import static com.fauna.codec.Generic.pageOf;
import static com.fauna.constants.ErrorMessages.QUERY_EXECUTION;
import static com.fauna.constants.ErrorMessages.QUERY_PAGE;
import static com.fauna.constants.ErrorMessages.STREAM_SUBSCRIPTION;
import static com.fauna.query.builder.Query.fql;

public abstract class FaunaClient {

public static final RetryStrategy DEFAULT_RETRY_STRATEGY = ExponentialBackoffStrategy.builder().build();
Expand Down Expand Up @@ -52,7 +62,9 @@ public Optional<Long> getLastTransactionTs() {
}

private static Optional<ServiceException> extractServiceException(Throwable throwable) {
if (throwable.getCause() instanceof ServiceException) {
if (throwable instanceof ServiceException) {
return Optional.of((ServiceException) throwable);
} else if (throwable.getCause() instanceof ServiceException) {
return Optional.of((ServiceException) throwable.getCause());
} else {
return Optional.empty();
Expand All @@ -78,6 +90,18 @@ private <T> Supplier<CompletableFuture<QuerySuccess<T>>> makeAsyncRequest(HttpCl
return () -> client.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream()).thenApply(body -> QueryResponse.parseResponse(body, codec)).whenComplete(this::completeRequest);
}

private <R> R completeAsync(CompletableFuture<R> future, String executionMessage) {
try {
return future.get();
} catch(ExecutionException | InterruptedException exc) {
if (exc.getCause() != null && exc.getCause() instanceof FaunaException) {
throw (FaunaException) exc.getCause();
} else {
throw new ClientException(executionMessage, exc);
}
}
}


//region Asynchronous API
/**
Expand Down Expand Up @@ -196,15 +220,7 @@ public <E> CompletableFuture<QuerySuccess<E>> asyncQuery(Query fql, Parameterize
* @throws FaunaException If the query does not succeed, an exception will be thrown.
*/
public QuerySuccess<Object> query(Query fql) throws FaunaException {
try {
return this.asyncQuery(fql, Object.class, null).get();
} catch (InterruptedException | ExecutionException e) {
if (e.getCause() instanceof FaunaException) {
throw (FaunaException) e.getCause();
} else {
throw new ClientException("Unhandled exception.", e);
}
}
return completeAsync(asyncQuery(fql, Object.class, null), "Unable to execute query.");
}

/**
Expand All @@ -219,15 +235,7 @@ public QuerySuccess<Object> query(Query fql) throws FaunaException {
* @throws FaunaException If the query does not succeed, an exception will be thrown.
*/
public <T> QuerySuccess<T> query(Query fql, Class<T> resultClass) throws FaunaException {
try {
return this.asyncQuery(fql, resultClass, null).get();
} catch (InterruptedException | ExecutionException e) {
if (e.getCause() instanceof FaunaException) {
throw (FaunaException) e.getCause();
} else {
throw new ClientException("Unhandled exception.", e);
}
}
return completeAsync(asyncQuery(fql, resultClass, null), QUERY_EXECUTION);
}

/**
Expand All @@ -242,15 +250,7 @@ public <T> QuerySuccess<T> query(Query fql, Class<T> resultClass) throws FaunaEx
* @throws FaunaException If the query does not succeed, an exception will be thrown.
*/
public <E> QuerySuccess<E> query(Query fql, ParameterizedOf<E> parameterizedType) throws FaunaException {
try {
return this.asyncQuery(fql, parameterizedType, null).get();
} catch (InterruptedException | ExecutionException e) {
if (e.getCause() instanceof FaunaException) {
throw (FaunaException) e.getCause();
} else {
throw new ClientException("Unhandled exception.", e);
}
}
return completeAsync(asyncQuery(fql, parameterizedType), QUERY_EXECUTION);
}

/**
Expand All @@ -266,15 +266,7 @@ public <E> QuerySuccess<E> query(Query fql, ParameterizedOf<E> parameterizedType
* @throws FaunaException If the query does not succeed, an exception will be thrown.
*/
public <T> QuerySuccess<T> query(Query fql, Class<T> resultClass, QueryOptions options) throws FaunaException {
try {
return this.asyncQuery(fql, resultClass, options).get();
} catch (InterruptedException | ExecutionException e) {
if (e.getCause() instanceof FaunaException) {
throw (FaunaException) e.getCause();
} else {
throw new ClientException("Unhandled exception.", e);
}
}
return completeAsync(asyncQuery(fql, resultClass, options), QUERY_EXECUTION);
}

/**
Expand All @@ -290,15 +282,38 @@ public <T> QuerySuccess<T> query(Query fql, Class<T> resultClass, QueryOptions o
* @throws FaunaException If the query does not succeed, an exception will be thrown.
*/
public <E> QuerySuccess<E> query(Query fql, ParameterizedOf<E> parameterizedType, QueryOptions options) throws FaunaException {
try {
return this.asyncQuery(fql, parameterizedType, options).get();
} catch (InterruptedException | ExecutionException e) {
if (e.getCause() instanceof FaunaException) {
throw (FaunaException) e.getCause();
} else {
throw new ClientException("Unhandled exception.", e);
}
}
return completeAsync(asyncQuery(fql, parameterizedType, options), QUERY_EXECUTION);
}
//endregion

//region Query Page API

/**
* Sends a query to Fauna that retrieves the Page<E> for the given page token.
* @param after The page token (result of a previous paginated request).
* @param elementClass The expected class of the query result.
* @param options A (nullable) set of options to pass to the query.
* @return A CompletableFuture that returns a QuerySuccess with data of type Page<E>.
* @throws FaunaException If the query does not succeed, an exception will be thrown.
* @param <E> The type of the elements of the page.
*/
public <E> CompletableFuture<QuerySuccess<Page<E>>> asyncQueryPage(
AfterToken after, Class<E> elementClass, QueryOptions options) {
return this.asyncQuery(PageIterator.buildPageQuery(after), pageOf(elementClass), options);
}

/**
* Sends a query to Fauna that retrieves the Page<E> for the given page token.
* @param after The page token (result of a previous paginated request).
* @param elementClass The expected class of the query result.
* @param options A (nullable) set of options to pass to the query.
* @return A QuerySuccess with data of type Page<E>.
* @throws FaunaException If the query does not succeed, an exception will be thrown.
* @param <E> The type of the elements of the page.
*/
public <E> QuerySuccess<Page<E>> queryPage(
AfterToken after, Class<E> elementClass, QueryOptions options) {
return completeAsync(asyncQueryPage(after, elementClass, options), QUERY_PAGE);
}
//endregion

Expand All @@ -310,6 +325,7 @@ public <E> QuerySuccess<E> query(Query fql, ParameterizedOf<E> parameterizedType
* @param options A (nullable) set of options to pass to the query.
* @return QuerySuccess The successful query result.
* @throws FaunaException If the query does not succeed, an exception will be thrown.
* @param <E> The type of the elements of the page.
*/
public <E> PageIterator<E> paginate(Query fql, Class<E> elementClass, QueryOptions options) {
return new PageIterator<>(this, fql, elementClass, options);
Expand Down Expand Up @@ -379,12 +395,7 @@ public <E> CompletableFuture<FaunaStream<E>> asyncStream(StreamRequest streamReq
* @throws FaunaException If the query does not succeed, an exception will be thrown.
*/
public <E> FaunaStream<E> stream(StreamRequest streamRequest, Class<E> elementClass) {
try {
return this.asyncStream(streamRequest, elementClass).get();
} catch (InterruptedException | ExecutionException e) {
throw new ClientException("Unable to subscribe to stream.", e);
}

return completeAsync(asyncStream(streamRequest, elementClass), STREAM_SUBSCRIPTION);
}

/**
Expand Down Expand Up @@ -418,12 +429,7 @@ public <E> CompletableFuture<FaunaStream<E>> asyncStream(Query fql, Class<E> ele
* @throws FaunaException If the query does not succeed, an exception will be thrown.
*/
public <E> FaunaStream<E> stream(Query fql, Class<E> elementClass) {
try {
return this.asyncStream(fql, elementClass).get();
} catch (InterruptedException | ExecutionException e) {
throw new ClientException("Unable to subscribe to stream.", e);
}

return completeAsync(asyncStream(fql, elementClass), STREAM_SUBSCRIPTION);
}
//endregion
}
12 changes: 8 additions & 4 deletions src/main/java/com/fauna/client/PageIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import com.fauna.exception.FaunaException;
import com.fauna.query.AfterToken;
import com.fauna.query.QueryOptions;
import com.fauna.query.builder.Query;
import com.fauna.response.QuerySuccess;
Expand All @@ -21,8 +22,8 @@
* @param <E>
*/
public class PageIterator<E> implements Iterator<Page<E>> {
private static final String TOKEN_NAME = "token";
private static final String PAGINATE_QUERY = "Set.paginate(${" + TOKEN_NAME + "})";
static final String TOKEN_NAME = "token";
static final String PAGINATE_QUERY = "Set.paginate(${" + TOKEN_NAME + "})";
private final FaunaClient client;
private final QueryOptions options;
private final PageOf<E> pageClass;
Expand All @@ -48,8 +49,11 @@ public boolean hasNext() {
return this.queryFuture != null;
}

private void doPaginatedQuery(String afterToken) {
this.queryFuture = client.asyncQuery(fql(PAGINATE_QUERY, Map.of(TOKEN_NAME, afterToken)), pageClass, options);
public static Query buildPageQuery(AfterToken afterToken) {
return fql(PAGINATE_QUERY, Map.of(TOKEN_NAME, afterToken.getToken()));
}
private void doPaginatedQuery(AfterToken afterToken) {
this.queryFuture = client.asyncQuery(PageIterator.buildPageQuery(afterToken), pageClass, options);
}

private void endPagination() {
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/com/fauna/constants/ErrorMessages.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.fauna.constants;

public class ErrorMessages {
public static final String QUERY_EXECUTION = "Unable to execute query.";
public static final String STREAM_SUBSCRIPTION = "Unable to subscribe to stream.";
public static final String QUERY_PAGE = "Unable to query page.";

}
2 changes: 2 additions & 0 deletions src/main/java/com/fauna/exception/ErrorHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.fauna.response.QueryFailure;


import java.util.concurrent.ExecutionException;

import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static java.net.HttpURLConnection.HTTP_FORBIDDEN;
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/com/fauna/query/AfterToken.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.fauna.query;

import java.util.Optional;

public class AfterToken {
private final String token;

public AfterToken(String token) {
this.token = token;
}

public String getToken() {
return token;
}

public static Optional<AfterToken> fromString(String token) {
return Optional.ofNullable(token != null ? new AfterToken(token) : null);
}
}
6 changes: 4 additions & 2 deletions src/main/java/com/fauna/types/Page.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.fauna.types;

import com.fauna.query.AfterToken;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -22,8 +24,8 @@ public List<T> getData() {
return data;
}

public Optional<String> getAfter() {
return Optional.ofNullable(after);
public Optional<AfterToken> getAfter() {
return AfterToken.fromString(after);
}

@Override
Expand Down
14 changes: 7 additions & 7 deletions src/test/java/com/fauna/e2e/E2EPaginationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;

import static com.fauna.query.builder.Query.fql;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -74,17 +72,19 @@ public void query_single_page_gets_wrapped_in_page() {

@Test
public void query_all_with_manual_pagination() {
// Demonstrate how a user could paginate without the paginate API.
// Demonstrate how a user could paginate without PageIterator.
PageOf<Product> pageOf = new PageOf<>(Product.class);
QuerySuccess<Page<Product>> first = client.query(fql("Product.all()"), pageOf);
Page<Product> latest = first.getData();
List<List<Product>> pages = new ArrayList<>();

pages.add(latest.getData());
while (latest.getAfter().isPresent()) {
QuerySuccess<Page<Product>> paged = client.query(fql("Set.paginate(${after})", Map.of("after", latest.getAfter())), pageOf);
latest = paged.getData();
pages.add(latest.getData());
while (latest != null) {
latest = latest.getAfter().map(after -> {
Page<Product> page = client.queryPage(after, Product.class, null).getData();
pages.add(page.getData());
return page;
}).orElse(null);
}
assertEquals(4, pages.size());
assertEquals(2, pages.get(3).size());
Expand Down

0 comments on commit d8d7b9f

Please sign in to comment.