Skip to content

Commit

Permalink
Update last_txn_ts on ServiceException.
Browse files Browse the repository at this point in the history
  • Loading branch information
David Griffin committed Sep 19, 2024
1 parent 1429f43 commit 5908704
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 18 deletions.
33 changes: 25 additions & 8 deletions src/main/java/com/fauna/client/FaunaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.fauna.codec.DefaultCodecRegistry;
import com.fauna.exception.ClientException;
import com.fauna.exception.FaunaException;
import com.fauna.exception.ServiceException;
import com.fauna.query.QueryOptions;
import com.fauna.stream.StreamRequest;
import com.fauna.query.StreamTokenResponse;
Expand Down Expand Up @@ -54,13 +55,29 @@ private static <T> Supplier<CompletableFuture<QuerySuccess<T>>> makeAsyncRequest
return () -> client.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream()).thenApply(body -> QueryResponse.parseResponse(body, codec));
}

public <T> QuerySuccess<T> updateTxnTs(QuerySuccess<T> success) {
// TODO: handleQueryFailure.
Long newTs = success.getLastSeenTxn();
private static Optional<ServiceException> extractServiceException(Throwable throwable) {
if (throwable.getCause() instanceof FaunaException) {
return Optional.of((ServiceException) throwable.getCause());
} else {
return Optional.empty();
}
}

private void updateTs(QueryResponse resp) {
Long newTs = resp.getLastSeenTxn();
if (newTs != null) {
this.lastTransactionTs.updateAndGet(oldTs -> newTs > oldTs ? newTs : oldTs );
}
return success;
}

private <T> QuerySuccess<T> completeRequest(QuerySuccess<T> success, Throwable throwable) {
if (success != null) {
updateTs(success);
return success;
} else if (throwable != null) {
extractServiceException(throwable).ifPresent(exc -> updateTs(exc.getResponse()));
}
return null;
}

//region Asynchronous API
Expand All @@ -81,7 +98,7 @@ public CompletableFuture<QuerySuccess<Object>> asyncQuery(Query fql) {
}
Codec<Object> codec = codecProvider.get(Object.class, null);
return new RetryHandler<QuerySuccess<Object>>(getRetryStrategy()).execute(FaunaClient.makeAsyncRequest(
getHttpClient(), getRequestBuilder().buildRequest(fql, null, codecProvider, lastTransactionTs.get()), codec)).thenApply(this::updateTxnTs);
getHttpClient(), getRequestBuilder().buildRequest(fql, null, codecProvider, lastTransactionTs.get()), codec)).whenComplete(this::completeRequest);
}

/**
Expand All @@ -103,7 +120,7 @@ public <T> CompletableFuture<QuerySuccess<T>> asyncQuery(Query fql, Class<T> res
}
Codec<T> codec = codecProvider.get(resultClass, null);
return new RetryHandler<QuerySuccess<T>>(getRetryStrategy()).execute(FaunaClient.makeAsyncRequest(
getHttpClient(), getRequestBuilder().buildRequest(fql, options, codecProvider, lastTransactionTs.get()), codec)).thenApply(this::updateTxnTs);
getHttpClient(), getRequestBuilder().buildRequest(fql, options, codecProvider, lastTransactionTs.get()), codec)).whenComplete(this::completeRequest);
}

/**
Expand All @@ -126,7 +143,7 @@ public <E> CompletableFuture<QuerySuccess<E>> asyncQuery(Query fql, Parameterize
@SuppressWarnings("unchecked")
Codec<E> codec = codecProvider.get((Class<E>) parameterizedType.getRawType(), parameterizedType.getActualTypeArguments());
return new RetryHandler<QuerySuccess<E>>(getRetryStrategy()).execute(FaunaClient.makeAsyncRequest(
getHttpClient(), getRequestBuilder().buildRequest(fql, options, codecProvider, lastTransactionTs.get()), codec)).thenApply(this::updateTxnTs);
getHttpClient(), getRequestBuilder().buildRequest(fql, options, codecProvider, lastTransactionTs.get()), codec)).whenComplete(this::completeRequest);
}

/**
Expand Down Expand Up @@ -164,7 +181,7 @@ public <E> CompletableFuture<QuerySuccess<E>> asyncQuery(Query fql, Parameterize
@SuppressWarnings("unchecked")
Codec<E> codec = codecProvider.get((Class<E>) parameterizedType.getRawType(), parameterizedType.getActualTypeArguments());
return new RetryHandler<QuerySuccess<E>>(getRetryStrategy()).execute(FaunaClient.makeAsyncRequest(
getHttpClient(), getRequestBuilder().buildRequest(fql, null, codecProvider, lastTransactionTs.get()), codec)).thenApply(this::updateTxnTs);
getHttpClient(), getRequestBuilder().buildRequest(fql, null, codecProvider, lastTransactionTs.get()), codec)).whenComplete(this::completeRequest);
}
//endregion

Expand Down
19 changes: 10 additions & 9 deletions src/main/java/com/fauna/response/QueryResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,16 @@ private static <T> Builder<T> handleField(Builder<T> builder, JsonParser parser)

public static <T> QuerySuccess<T> parseResponse(HttpResponse<InputStream> response, Codec<T> codec) throws FaunaException {
try {
JsonParser parser = JSON_FACTORY.createParser(response.body());
JsonToken firstToken = parser.nextToken();
Builder<T> builder = QueryResponse.builder(codec);
if (firstToken != JsonToken.START_OBJECT) {
throw new ClientResponseException("Response must be JSON object.");
}
while (parser.nextToken() == JsonToken.FIELD_NAME) {
builder = handleField(builder, parser);
}
JsonParser parser = JSON_FACTORY.createParser(response.body());

JsonToken firstToken = parser.nextToken();
Builder<T> builder = QueryResponse.builder(codec);
if (firstToken != JsonToken.START_OBJECT) {
throw new ClientResponseException("Response must be JSON object.");
}
while (parser.nextToken() == JsonToken.FIELD_NAME) {
builder = handleField(builder, parser);
}
int httpStatus = response.statusCode();
if (httpStatus >= 400) {
QueryFailure failure = new QueryFailure(httpStatus, builder);
Expand Down
12 changes: 11 additions & 1 deletion src/test/java/com/fauna/e2e/E2EQueryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fauna.client.FaunaClient;
import com.fauna.e2e.beans.Author;
import com.fauna.exception.AbortException;
import com.fauna.exception.QueryRuntimeException;
import com.fauna.query.QueryOptions;
import com.fauna.query.builder.Query;
import com.fauna.response.QuerySuccess;
Expand Down Expand Up @@ -50,14 +51,23 @@ public void query_sync() {
}

@Test
public void clientTransactionTs() {
public void clientTransactionTsOnSuccess() {
FaunaClient client = Fauna.local();
assertTrue(client.getLastTransactionTs().isEmpty());
client.query(fql("42"));
long y2k = Instant.parse("1999-12-31T23:59:59.99Z").getEpochSecond() * 1_000_000;
assertTrue(client.getLastTransactionTs().orElseThrow() > y2k);
}

@Test
public void clientTransactionTsOnFailure() {
FaunaClient client = Fauna.local();
assertTrue(client.getLastTransactionTs().isEmpty());
assertThrows(QueryRuntimeException.class, () -> client.query(fql("NonExistantCollection.all()")));
long y2k = Instant.parse("1999-12-31T23:59:59.99Z").getEpochSecond() * 1_000_000;
assertTrue(client.getLastTransactionTs().orElseThrow() > y2k);
}

@Test
public void query_syncWithClass() {
var q = fql("42");
Expand Down

0 comments on commit 5908704

Please sign in to comment.