Skip to content

Commit

Permalink
Allow user-configurable HTTP timeout for query and stream requests.
Browse files Browse the repository at this point in the history
  • Loading branch information
David Griffin committed Oct 9, 2024
1 parent 49b02b8 commit 7be807f
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 30 deletions.
15 changes: 12 additions & 3 deletions src/main/java/com/fauna/client/RequestBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.net.http.HttpRequest;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.logging.Logger;

import static com.fauna.client.Logging.headersAsString;
Expand All @@ -30,6 +31,8 @@ public class RequestBuilder {
private static final String QUERY_PATH = "/query/1";
private static final String STREAM_PATH = "/stream/1";

private static final int TIMEOUT_BUFFER_MS = 1000;

private final HttpRequest.Builder baseRequestBuilder;
private final Logger logger;

Expand Down Expand Up @@ -92,8 +95,10 @@ public RequestBuilder scopedRequestBuilder(String token) {
}

private void logRequest(String body, HttpRequest req) {
logger.fine(MessageFormat.format("Fauna HTTP {0} Request to {1} (timeout {2}), headers: {3}",
req.method(), req.uri(), req.timeout(), headersAsString(req.headers())));
String timeout = req.timeout().map(
val -> MessageFormat.format(" (timeout: {0})", val)).orElse("");
logger.fine(MessageFormat.format("Fauna HTTP {0} Request to {1}{2}, headers: {3}",
req.method(), req.uri(), timeout, headersAsString(req.headers())));
logger.finest("Request body: " + body);
}

Expand Down Expand Up @@ -139,6 +144,7 @@ public String buildStreamRequestBody(StreamRequest request) throws IOException {

public HttpRequest buildStreamRequest(StreamRequest request) {
HttpRequest.Builder builder = baseRequestBuilder.copy();
request.getTimeout().ifPresent(builder::timeout);
try {
String body = buildStreamRequestBody(request);
HttpRequest req = builder.POST(HttpRequest.BodyPublishers.ofString(body)).build();
Expand Down Expand Up @@ -166,7 +172,10 @@ private HttpRequest.Builder getBuilder(QueryOptions options, Long lastTxnTs) {
builder.setHeader(Headers.LAST_TXN_TS, String.valueOf(lastTxnTs));
}
if (options != null) {
options.getTimeoutMillis().ifPresent(val -> builder.header(Headers.QUERY_TIMEOUT_MS, String.valueOf(val)));
options.getTimeoutMillis().ifPresent(val -> {
builder.timeout(Duration.ofMillis(val + TIMEOUT_BUFFER_MS));
builder.header(Headers.QUERY_TIMEOUT_MS, String.valueOf(val));
});
options.getLinearized().ifPresent(val -> builder.header(Headers.LINEARIZED, String.valueOf(val)));
options.getTypeCheck().ifPresent(val -> builder.header(Headers.TYPE_CHECK, String.valueOf(val)));
options.getTraceParent().ifPresent(val -> builder.header(Headers.TRACE_PARENT, val));
Expand Down
102 changes: 82 additions & 20 deletions src/main/java/com/fauna/stream/StreamRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fauna.query.StreamTokenResponse;

import java.time.Duration;
import java.util.Optional;

/**
Expand All @@ -10,37 +11,94 @@
public class StreamRequest {
private final String token;
private final String cursor;
private final Long start_ts;
private final Long startTs;
private final Duration timeout;

public StreamRequest(String token) {
StreamRequest(String token, String cursor, Long startTs, Duration timeout) {
this.token = token;
this.cursor = null;
this.start_ts = null;
this.cursor = cursor;
this.startTs = startTs;
this.timeout = timeout;
if (token == null || token.isEmpty()) {
throw new IllegalArgumentException("token cannot be null or empty");
}
if (cursor != null && startTs != null) {
throw new IllegalArgumentException("Only one of cursor, or start_ts can be set.");
}
}

public StreamRequest(String token, String cursor) {
this.token = token;
this.cursor = cursor;
this.start_ts = null;
if (token == null || token.isEmpty()) {
throw new IllegalArgumentException("token cannot be null or empty");
}
public static StreamRequest fromTokenResponse(StreamTokenResponse tokenResponse) {
return new StreamRequest(tokenResponse.getToken(), null, null, null);
}

public StreamRequest(String token, Long start_ts) {
this.token = token;
this.cursor = null;
this.start_ts = start_ts;
if (token == null || token.isEmpty()) {
throw new IllegalArgumentException("token cannot be null or empty");
public static class Builder {
final String token;
String cursor = null;
Long startTs = null;
Duration timeout = null;

/**
* Return a new StreamRequest.Builder instance with the given token.
* @param token A Fauna Stream token.
*/
public Builder(String token) {
this.token = token;
}

/**
* Return the current Builder instance with the given cursor.
* @param cursor A Fauna Stream cursor.
* @return The current Builder instance.
* @throws IllegalArgumentException If startTs has already been set.
*/
public Builder cursor(String cursor) {
if (this.startTs != null) {
throw new IllegalArgumentException("only one of cursor, or startTs can be set.");
}
this.cursor = cursor;
return this;
}

/**
* Return the current Builder instance with the given start timestamp.
* @param startTs A timestamp to start the stream at.
* @return The current Builder instance.
* @throws IllegalArgumentException If startTs has already been set.
*/
public Builder startTs(Long startTs) {
if (this.cursor != null) {
throw new IllegalArgumentException("only one of cursor, or startTs can be set.");
}
this.startTs = startTs;
return this;
}

/**
* Return the current builder instance with the given timeout.
* This timeout is the HTTP client timeout that is passed to java.net.http.HttpRequest.Builder.
* By testing, it seems that the client waits for the first byte, not the last byte of the response.
* Therefore, it's a timeout on the stream being opened, but the stream can stay open indefinitely.
* @param timeout A Duration representing the timeout.
* @return The current Builder instance.
*/
public Builder timeout(Duration timeout) {
this.timeout = timeout;
return this;
}

public StreamRequest build() {
return new StreamRequest(token, cursor, startTs, timeout);
}

}

public static StreamRequest fromTokenResponse(StreamTokenResponse tokenResponse) {
return new StreamRequest(tokenResponse.getToken());
/**
* Create a new StreamRequest.Builder instance.
* @param token The Fauna Stream token to use.
* @return A new StreamRequest.Builder instance.
*/
public static Builder builder(String token) {
return new Builder(token);
}

public String getToken() {
Expand All @@ -52,6 +110,10 @@ public Optional<String> getCursor() {
}

public Optional<Long> getStartTs() {
return Optional.ofNullable(start_ts);
return Optional.ofNullable(startTs);
}

public Optional<Duration> getTimeout() {
return Optional.ofNullable(timeout);
}
}
6 changes: 3 additions & 3 deletions src/test/java/com/fauna/client/RequestBuilderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void buildRequest_shouldIncludeOptionalHeadersWhenPresent() {
@Test
void buildStreamRequestBody_shouldOnlyIncludeToken() throws IOException {
// Given
StreamRequest request = new StreamRequest("tkn");
StreamRequest request = StreamRequest.builder("tkn").build();
// When
String body = requestBuilder.buildStreamRequestBody(request);
// Then
Expand All @@ -91,7 +91,7 @@ void buildStreamRequestBody_shouldOnlyIncludeToken() throws IOException {
@Test
void buildStreamRequestBody_shouldIncludeCursor() throws IOException {
// Given
StreamRequest request = new StreamRequest("tkn", "cur");
StreamRequest request = StreamRequest.builder("tkn").cursor("cur").build();
// When
String body = requestBuilder.buildStreamRequestBody(request);
// Then
Expand All @@ -101,7 +101,7 @@ void buildStreamRequestBody_shouldIncludeCursor() throws IOException {
@Test
void buildStreamRequestBody_shouldIncludeTimestamp() throws IOException {
// Given
StreamRequest request = new StreamRequest("tkn", Long.MAX_VALUE / 2);
StreamRequest request = StreamRequest.builder("tkn").startTs(Long.MAX_VALUE / 2).build();
// When
String body = requestBuilder.buildStreamRequestBody(request);
// Then
Expand Down
11 changes: 11 additions & 0 deletions src/test/java/com/fauna/e2e/E2EQueryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import com.fauna.client.FaunaClient;
import com.fauna.e2e.beans.Author;
import com.fauna.exception.AbortException;
import com.fauna.exception.ClientException;
import com.fauna.exception.QueryRuntimeException;
import com.fauna.exception.QueryTimeoutException;
import com.fauna.query.QueryOptions;
import com.fauna.query.builder.Query;
import com.fauna.response.QuerySuccess;
Expand All @@ -15,6 +17,7 @@
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -68,6 +71,14 @@ public void clientTransactionTsOnFailure() {
assertTrue(client.getLastTransactionTs().orElseThrow() > y2k);
}

@Test
public void queryTimeout() {
QueryOptions opts = QueryOptions.builder().timeout(Duration.ofMillis(1)).build();
QueryTimeoutException exc = assertThrows(QueryTimeoutException.class,
() -> c.query(fql("Author.byId('9090090')"), listOf(Author.class), opts));
assertTrue(exc.getMessage().contains("Client set aggressive deadline"));
}

@Test
public void query_syncWithClass() {
var q = fql("42");
Expand Down
18 changes: 17 additions & 1 deletion src/test/java/com/fauna/e2e/E2EStreamingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fauna.client.FaunaClient;
import com.fauna.client.FaunaStream;
import com.fauna.e2e.beans.Product;
import com.fauna.exception.ClientException;
import com.fauna.query.StreamTokenResponse;
import com.fauna.query.builder.Query;
import com.fauna.response.QuerySuccess;
Expand All @@ -13,13 +14,16 @@
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.net.http.HttpTimeoutException;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -29,6 +33,7 @@
import static com.fauna.query.builder.Query.fql;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class E2EStreamingTest {
Expand Down Expand Up @@ -140,7 +145,7 @@ public void handleStreamError() throws InterruptedException {
// It would be nice to have another test that generates a stream with normal events, and then an error
// event, but this at least tests some of the functionality.
QuerySuccess<StreamTokenResponse> queryResp = client.query(fql("Product.all().toStream()"), StreamTokenResponse.class);
StreamRequest request = new StreamRequest(queryResp.getData().getToken(), "invalid_cursor");
StreamRequest request = StreamRequest.builder((queryResp.getData().getToken())).cursor("invalid_cursor").build();
FaunaStream stream = client.stream(request, Product.class);
InventorySubscriber inventory = new InventorySubscriber();
stream.subscribe(inventory);
Expand All @@ -151,6 +156,17 @@ public void handleStreamError() throws InterruptedException {
assertTrue(stream.isClosed());
}

@Test
public void handleStreamTimeout() {
QuerySuccess<StreamTokenResponse> queryResp = client.query(fql("Product.all().toStream()"), StreamTokenResponse.class);
StreamRequest request = StreamRequest.builder((queryResp.getData().getToken())).timeout(Duration.ofMillis(1)).build();
ClientException exc = assertThrows(ClientException.class, () -> client.stream(request, Product.class));
assertEquals(ExecutionException.class, exc.getCause().getClass());
assertEquals(HttpTimeoutException.class, exc.getCause().getCause().getClass());


}

@Disabled("Will fix this for GA, I think the other drivers have this bug too.")
@Test
public void handleLargeEvents() throws InterruptedException {
Expand Down
13 changes: 10 additions & 3 deletions src/test/java/com/fauna/stream/StreamRequestTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,40 @@
import java.io.IOException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class StreamRequestTest {
public static final CodecProvider provider = DefaultCodecProvider.SINGLETON;

@Test
public void testTokenOnlyRequest() {
StreamRequest req = new StreamRequest("abc");
StreamRequest req = StreamRequest.builder("abc").build();
assertEquals("abc", req.getToken());
assertTrue(req.getCursor().isEmpty());
assertTrue(req.getStartTs().isEmpty());
}

@Test
public void testCursorRequest() {
StreamRequest req = new StreamRequest("abc", "def");
StreamRequest req = StreamRequest.builder("abc").cursor("def").build();
assertEquals("abc", req.getToken());
assertEquals("def", req.getCursor().get());
assertTrue(req.getStartTs().isEmpty());
}

@Test
public void testTsRequest() {
StreamRequest req = new StreamRequest("abc", 1234L);
StreamRequest req = StreamRequest.builder("abc").startTs(1234L).build();
assertEquals("abc", req.getToken());
assertTrue(req.getCursor().isEmpty());
assertEquals(1234L, req.getStartTs().get());
}

@Test
public void testCursorAndTsRequest() {
assertThrows(IllegalArgumentException.class, () -> StreamRequest.builder("tkn").startTs(10L).cursor("hello"));
assertThrows(IllegalArgumentException.class, () -> StreamRequest.builder("tkn").cursor("hello").startTs(10L));
}

}

0 comments on commit 7be807f

Please sign in to comment.