diff --git a/src/main/java/com/fauna/client/FaunaConfig.java b/src/main/java/com/fauna/client/FaunaConfig.java index 851a5769..7f725161 100644 --- a/src/main/java/com/fauna/client/FaunaConfig.java +++ b/src/main/java/com/fauna/client/FaunaConfig.java @@ -1,5 +1,6 @@ package com.fauna.client; +import java.time.Duration; import java.util.Optional; import java.util.logging.ConsoleHandler; import java.util.logging.Handler; @@ -7,7 +8,7 @@ /** * FaunaConfig is a configuration class used to set up and configure a connection to Fauna. - * It encapsulates various settings such as the endpoint URL, secret key, query timeout, and others. + * It encapsulates various settings such as the endpoint URL, secret key, and more. */ public class FaunaConfig { @@ -19,6 +20,7 @@ public static class FaunaEndpoint { private final String endpoint; private final String secret; private final int maxContentionRetries; + private final Duration clientTimeoutBuffer; private final Handler logHandler; public static final FaunaConfig DEFAULT = FaunaConfig.builder().build(); public static final FaunaConfig LOCAL = FaunaConfig.builder().endpoint( @@ -33,6 +35,7 @@ private FaunaConfig(Builder builder) { this.endpoint = builder.endpoint != null ? builder.endpoint : FaunaEndpoint.DEFAULT; this.secret = builder.secret != null ? builder.secret : ""; this.maxContentionRetries = builder.maxContentionRetries; + this.clientTimeoutBuffer = builder.clientTimeoutBuffer; this.logHandler = builder.logHandler; } @@ -63,6 +66,14 @@ public int getMaxContentionRetries() { return maxContentionRetries; } + /** + * Gets the buffer that will be added to the HTTP client timeout, in addition to any query timeout. + * @return The timeout buffer Duration. + */ + public Duration getClientTimeoutBuffer() { + return clientTimeoutBuffer; + } + /** * Gets the log handler that the client will use. * @return A log handler instance. @@ -71,7 +82,6 @@ public Handler getLogHandler() { return logHandler; } - /** * Creates a new builder for FaunaConfig. * @@ -88,6 +98,7 @@ public static class Builder { private String endpoint = FaunaEnvironment.faunaEndpoint().orElse(FaunaEndpoint.DEFAULT); private String secret = FaunaEnvironment.faunaSecret().orElse(""); private int maxContentionRetries = 3; + private Duration clientTimeoutBuffer = Duration.ofSeconds(5); private Handler logHandler = defaultLogHandler(); static Level getLogLevel(String debug) { @@ -141,6 +152,14 @@ public Builder maxContentionRetries(int maxContentionRetries) { return this; } + /** + * Set the client timeout buffer. + */ + public Builder clientTimeoutBuffer(Duration duration) { + this.clientTimeoutBuffer = duration; + return this; + } + /** * Override the default log handler with the given log handler. * @param handler A log handler instance. @@ -151,7 +170,7 @@ public Builder logHandler(Handler handler) { return this; } - /** + /** * Builds and returns a new FaunaConfig instance. * * @return A new instance of FaunaConfig. diff --git a/src/main/java/com/fauna/client/RequestBuilder.java b/src/main/java/com/fauna/client/RequestBuilder.java index f731e64c..3f885bbb 100644 --- a/src/main/java/com/fauna/client/RequestBuilder.java +++ b/src/main/java/com/fauna/client/RequestBuilder.java @@ -17,6 +17,7 @@ import java.net.http.HttpRequest; import java.nio.charset.StandardCharsets; import java.text.MessageFormat; +import java.time.Duration; import java.util.logging.Logger; import static com.fauna.client.Logging.headersAsString; @@ -31,6 +32,7 @@ public class RequestBuilder { private static final String STREAM_PATH = "/stream/1"; private final HttpRequest.Builder baseRequestBuilder; + private final Duration clientTimeoutBuffer; private final Logger logger; static class FieldNames { @@ -56,7 +58,7 @@ static class Headers { static final String FORMAT = "X-Format"; } - public RequestBuilder(URI uri, String token, int maxContentionRetries, Logger logger) { + public RequestBuilder(URI uri, String token, int maxContentionRetries, Duration clientTimeoutBuffer, Logger logger) { // DriverEnvironment is not needed outside the constructor for now. DriverEnvironment env = new DriverEnvironment(DriverEnvironment.JvmDriver.JAVA); this.baseRequestBuilder = HttpRequest.newBuilder().uri(uri).headers( @@ -68,32 +70,36 @@ public RequestBuilder(URI uri, String token, int maxContentionRetries, Logger lo RequestBuilder.Headers.MAX_CONTENTION_RETRIES, String.valueOf(maxContentionRetries), Headers.AUTHORIZATION, buildAuthHeader(token) ); + this.clientTimeoutBuffer = clientTimeoutBuffer; this.logger = logger; } - public RequestBuilder(HttpRequest.Builder builder, Logger logger) { + public RequestBuilder(HttpRequest.Builder builder, Duration clientTimeoutBuffer, Logger logger) { this.baseRequestBuilder = builder; + this.clientTimeoutBuffer = clientTimeoutBuffer; this.logger = logger; } public static RequestBuilder queryRequestBuilder(FaunaConfig config, Logger logger) { - return new RequestBuilder(URI.create(config.getEndpoint() + QUERY_PATH), config.getSecret(), config.getMaxContentionRetries(), logger); + return new RequestBuilder(URI.create(config.getEndpoint() + QUERY_PATH), config.getSecret(), config.getMaxContentionRetries(), config.getClientTimeoutBuffer(), logger); } public static RequestBuilder streamRequestBuilder(FaunaConfig config, Logger logger) { - return new RequestBuilder(URI.create(config.getEndpoint() + STREAM_PATH), config.getSecret(), config.getMaxContentionRetries(), logger); + return new RequestBuilder(URI.create(config.getEndpoint() + STREAM_PATH), config.getSecret(), config.getMaxContentionRetries(), config.getClientTimeoutBuffer(), logger); } public RequestBuilder scopedRequestBuilder(String token) { HttpRequest.Builder newBuilder = this.baseRequestBuilder.copy(); // .setHeader(..) clears existing headers (which we want) while .header(..) would append it :) newBuilder.setHeader(Headers.AUTHORIZATION, buildAuthHeader(token)); - return new RequestBuilder(newBuilder, logger); + return new RequestBuilder(newBuilder, clientTimeoutBuffer, logger); } private void logRequest(String body, HttpRequest req) { - logger.fine(MessageFormat.format("Fauna HTTP {0} Request to {1} (timeout {2}), headers: {3}", - req.method(), req.uri(), req.timeout(), headersAsString(req.headers()))); + String timeout = req.timeout().map( + val -> MessageFormat.format(" (timeout: {0})", val)).orElse(""); + logger.fine(MessageFormat.format("Fauna HTTP {0} Request to {1}{2}, headers: {3}", + req.method(), req.uri(), timeout, headersAsString(req.headers()))); logger.finest("Request body: " + body); } @@ -139,6 +145,7 @@ public String buildStreamRequestBody(StreamRequest request) throws IOException { public HttpRequest buildStreamRequest(StreamRequest request) { HttpRequest.Builder builder = baseRequestBuilder.copy(); + request.getTimeout().ifPresent(builder::timeout); try { String body = buildStreamRequestBody(request); HttpRequest req = builder.POST(HttpRequest.BodyPublishers.ofString(body)).build(); @@ -166,7 +173,11 @@ private HttpRequest.Builder getBuilder(QueryOptions options, Long lastTxnTs) { builder.setHeader(Headers.LAST_TXN_TS, String.valueOf(lastTxnTs)); } if (options != null) { - options.getTimeoutMillis().ifPresent(val -> builder.header(Headers.QUERY_TIMEOUT_MS, String.valueOf(val))); + + options.getTimeoutMillis().ifPresent(val -> { + builder.timeout(Duration.ofMillis(val).plus(clientTimeoutBuffer)); + builder.header(Headers.QUERY_TIMEOUT_MS, String.valueOf(val)); + }); options.getLinearized().ifPresent(val -> builder.header(Headers.LINEARIZED, String.valueOf(val))); options.getTypeCheck().ifPresent(val -> builder.header(Headers.TYPE_CHECK, String.valueOf(val))); options.getTraceParent().ifPresent(val -> builder.header(Headers.TRACE_PARENT, val)); diff --git a/src/main/java/com/fauna/stream/StreamRequest.java b/src/main/java/com/fauna/stream/StreamRequest.java index 0890f67f..5d3a7754 100644 --- a/src/main/java/com/fauna/stream/StreamRequest.java +++ b/src/main/java/com/fauna/stream/StreamRequest.java @@ -2,6 +2,7 @@ import com.fauna.query.StreamTokenResponse; +import java.time.Duration; import java.util.Optional; /** @@ -10,48 +11,131 @@ public class StreamRequest { private final String token; private final String cursor; - private final Long start_ts; + private final Long startTs; + private final Duration timeout; - public StreamRequest(String token) { + StreamRequest(String token, String cursor, Long startTs, Duration timeout) { this.token = token; - this.cursor = null; - this.start_ts = null; + this.cursor = cursor; + this.startTs = startTs; + this.timeout = timeout; if (token == null || token.isEmpty()) { throw new IllegalArgumentException("token cannot be null or empty"); } + if (cursor != null && startTs != null) { + throw new IllegalArgumentException("Only one of cursor, or start_ts can be set."); + } } - public StreamRequest(String token, String cursor) { - this.token = token; - this.cursor = cursor; - this.start_ts = null; - if (token == null || token.isEmpty()) { - throw new IllegalArgumentException("token cannot be null or empty"); - } + public static StreamRequest fromTokenResponse(StreamTokenResponse tokenResponse) { + return new StreamRequest(tokenResponse.getToken(), null, null, null); } - public StreamRequest(String token, Long start_ts) { - this.token = token; - this.cursor = null; - this.start_ts = start_ts; - if (token == null || token.isEmpty()) { - throw new IllegalArgumentException("token cannot be null or empty"); + public static class Builder { + final String token; + String cursor = null; + Long startTs = null; + Duration timeout = null; + + /** + * Return a new StreamRequest.Builder instance with the given token. + * @param token A Fauna Stream token. + */ + public Builder(String token) { + this.token = token; } + + /** + * Return the current Builder instance with the given cursor. + * @param cursor A Fauna Stream cursor. + * @return The current Builder instance. + * @throws IllegalArgumentException If startTs has already been set. + */ + public Builder cursor(String cursor) { + if (this.startTs != null) { + throw new IllegalArgumentException("only one of cursor, or startTs can be set."); + } + this.cursor = cursor; + return this; + } + + /** + * Return the current Builder instance with the given start timestamp. + * @param startTs A timestamp to start the stream at. + * @return The current Builder instance. + * @throws IllegalArgumentException If startTs has already been set. + */ + public Builder startTs(Long startTs) { + if (this.cursor != null) { + throw new IllegalArgumentException("only one of cursor, or startTs can be set."); + } + this.startTs = startTs; + return this; + } + + /** + * Return the current builder instance with the given timeout. + * This timeout is the HTTP client timeout that is passed to java.net.http.HttpRequest.Builder. + * The Java documentation says that if "the response is not received within the specified timeout then an + * HttpTimeoutException is thrown". For streaming this means that the exception is thrown if the first + * headers/bytes are not recieved within the timeout. + * + * The default value is null if the user does not set this timeout. + * @param timeout A Duration representing the timeout. + * @return The current Builder instance. + */ + public Builder timeout(Duration timeout) { + this.timeout = timeout; + return this; + } + + public StreamRequest build() { + return new StreamRequest(token, cursor, startTs, timeout); + } + } - public static StreamRequest fromTokenResponse(StreamTokenResponse tokenResponse) { - return new StreamRequest(tokenResponse.getToken()); + /** + * Create a new StreamRequest.Builder instance. + * @param token The Fauna Stream token to use. + * @return A new StreamRequest.Builder instance. + */ + public static Builder builder(String token) { + return new Builder(token); } + /** + * Stream token for the event stream to subscribe to. + * @return A String representing the Stream token. + */ public String getToken() { return token; } + /** + * Cursor for a previous event. If provided, the stream replays any events that occurred after + * the cursor (exclusive). + * @return The cursor, or Optional.empty() if not provided. + */ public Optional getCursor() { return Optional.ofNullable(cursor); } + /** + * Stream start time in microseconds since the Unix epoch. This is typically a previous event's txn_ts + * (transaction timestamp). + * @return The stream start time, as a Long, or Optional.empty() if not provided. + */ public Optional getStartTs() { - return Optional.ofNullable(start_ts); + return Optional.ofNullable(startTs); + } + + /** + * Stream HTTP request timeout. This timeout is passed to java.net.http.HttpRequest.Builder. The default + * is null/empty. + * @return The timeout Duration, or Optional.empty() if not set. + */ + public Optional getTimeout() { + return Optional.ofNullable(timeout); } } diff --git a/src/test/java/com/fauna/client/FaunaConfigTest.java b/src/test/java/com/fauna/client/FaunaConfigTest.java index d54a2b45..170c7a88 100644 --- a/src/test/java/com/fauna/client/FaunaConfigTest.java +++ b/src/test/java/com/fauna/client/FaunaConfigTest.java @@ -4,6 +4,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.time.Duration; import java.util.logging.ConsoleHandler; import java.util.logging.Level; @@ -32,7 +33,9 @@ public void testOverridingDefaultFaunaConfig() { .secret("foo") .endpoint("endpoint") .logHandler(handler) - .maxContentionRetries(1).build(); + .maxContentionRetries(1) + .clientTimeoutBuffer(Duration.ofSeconds(1)) + .build(); assertEquals("endpoint", config.getEndpoint()); assertEquals(Level.ALL, config.getLogHandler().getLevel()); assertEquals("foo", config.getSecret()); diff --git a/src/test/java/com/fauna/client/RequestBuilderTest.java b/src/test/java/com/fauna/client/RequestBuilderTest.java index 5b8ccc02..d4cfe664 100644 --- a/src/test/java/com/fauna/client/RequestBuilderTest.java +++ b/src/test/java/com/fauna/client/RequestBuilderTest.java @@ -54,6 +54,7 @@ void buildRequest_shouldConstructCorrectHttpRequest() { assertEquals("POST", httpRequest.method()); assertTrue(httpRequest.bodyPublisher().orElseThrow().contentLength() > 0); HttpHeaders headers = httpRequest.headers(); + assertTrue(httpRequest.timeout().isEmpty()); assertTrue(headers.firstValue(DRIVER_ENV).orElse("").contains("runtime=java")); assertTrue(headers.firstValue(DRIVER_ENV).orElse("").contains("driver=")); assertNotNull(headers.firstValue(AUTHORIZATION)); @@ -76,12 +77,26 @@ void buildRequest_shouldIncludeOptionalHeadersWhenPresent() { assertNotNull(headers.firstValue(QUERY_TAGS)); assertEquals("traceParent", headers.firstValue(TRACE_PARENT).orElseThrow()); assertEquals("1", headers.firstValue(LAST_TXN_TS).orElseThrow()); + // Query timeout + 5 seconds (default). + assertEquals(Duration.ofSeconds(20), httpRequest.timeout().orElseThrow()); + } + + @Test + void buildRequest_withCustomTimeoutBuffer() { + QueryOptions defaultOpts = QueryOptions.builder().build(); + QueryOptions timeoutOpts = QueryOptions.builder().timeout(Duration.ofSeconds(15)).build(); + + RequestBuilder requestBuilder = RequestBuilder.queryRequestBuilder( + FaunaConfig.builder().clientTimeoutBuffer(Duration.ofSeconds(1)).build(), Logger.getGlobal()); + HttpRequest req = requestBuilder.buildRequest(fql("42"), defaultOpts, codecProvider, 1L); + assertEquals(Duration.ofSeconds(6), requestBuilder.buildRequest(fql("42"), defaultOpts, codecProvider, 1L).timeout().orElseThrow()); + assertEquals(Duration.ofSeconds(16), requestBuilder.buildRequest(fql("42"), timeoutOpts, codecProvider, 1L).timeout().orElseThrow()); } @Test void buildStreamRequestBody_shouldOnlyIncludeToken() throws IOException { // Given - StreamRequest request = new StreamRequest("tkn"); + StreamRequest request = StreamRequest.builder("tkn").build(); // When String body = requestBuilder.buildStreamRequestBody(request); // Then @@ -91,7 +106,7 @@ void buildStreamRequestBody_shouldOnlyIncludeToken() throws IOException { @Test void buildStreamRequestBody_shouldIncludeCursor() throws IOException { // Given - StreamRequest request = new StreamRequest("tkn", "cur"); + StreamRequest request = StreamRequest.builder("tkn").cursor("cur").build(); // When String body = requestBuilder.buildStreamRequestBody(request); // Then @@ -101,7 +116,7 @@ void buildStreamRequestBody_shouldIncludeCursor() throws IOException { @Test void buildStreamRequestBody_shouldIncludeTimestamp() throws IOException { // Given - StreamRequest request = new StreamRequest("tkn", Long.MAX_VALUE / 2); + StreamRequest request = StreamRequest.builder("tkn").startTs(Long.MAX_VALUE / 2).build(); // When String body = requestBuilder.buildStreamRequestBody(request); // Then diff --git a/src/test/java/com/fauna/e2e/E2EQueryTest.java b/src/test/java/com/fauna/e2e/E2EQueryTest.java index 503f2d5b..69cd990e 100644 --- a/src/test/java/com/fauna/e2e/E2EQueryTest.java +++ b/src/test/java/com/fauna/e2e/E2EQueryTest.java @@ -5,6 +5,7 @@ import com.fauna.e2e.beans.Author; import com.fauna.exception.AbortException; import com.fauna.exception.QueryRuntimeException; +import com.fauna.exception.QueryTimeoutException; import com.fauna.query.QueryOptions; import com.fauna.query.builder.Query; import com.fauna.response.QuerySuccess; @@ -15,6 +16,7 @@ import org.junit.jupiter.api.Test; import java.io.IOException; +import java.time.Duration; import java.time.Instant; import java.util.HashMap; import java.util.List; @@ -68,6 +70,14 @@ public void clientTransactionTsOnFailure() { assertTrue(client.getLastTransactionTs().orElseThrow() > y2k); } + @Test + public void queryTimeout() { + QueryOptions opts = QueryOptions.builder().timeout(Duration.ofMillis(1)).build(); + QueryTimeoutException exc = assertThrows(QueryTimeoutException.class, + () -> c.query(fql("Author.byId('9090090')"), listOf(Author.class), opts)); + assertTrue(exc.getMessage().contains("Client set aggressive deadline")); + } + @Test public void query_syncWithClass() { var q = fql("42"); diff --git a/src/test/java/com/fauna/e2e/E2EStreamingTest.java b/src/test/java/com/fauna/e2e/E2EStreamingTest.java index 666d56d4..2ffd4ac3 100644 --- a/src/test/java/com/fauna/e2e/E2EStreamingTest.java +++ b/src/test/java/com/fauna/e2e/E2EStreamingTest.java @@ -4,6 +4,7 @@ import com.fauna.client.FaunaClient; import com.fauna.client.FaunaStream; import com.fauna.e2e.beans.Product; +import com.fauna.exception.ClientException; import com.fauna.query.StreamTokenResponse; import com.fauna.query.builder.Query; import com.fauna.response.QuerySuccess; @@ -13,13 +14,16 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import java.net.http.HttpTimeoutException; import java.text.MessageFormat; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -29,6 +33,7 @@ import static com.fauna.query.builder.Query.fql; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class E2EStreamingTest { @@ -140,7 +145,7 @@ public void handleStreamError() throws InterruptedException { // It would be nice to have another test that generates a stream with normal events, and then an error // event, but this at least tests some of the functionality. QuerySuccess queryResp = client.query(fql("Product.all().toStream()"), StreamTokenResponse.class); - StreamRequest request = new StreamRequest(queryResp.getData().getToken(), "invalid_cursor"); + StreamRequest request = StreamRequest.builder((queryResp.getData().getToken())).cursor("invalid_cursor").build(); FaunaStream stream = client.stream(request, Product.class); InventorySubscriber inventory = new InventorySubscriber(); stream.subscribe(inventory); @@ -151,6 +156,17 @@ public void handleStreamError() throws InterruptedException { assertTrue(stream.isClosed()); } + @Test + public void handleStreamTimeout() { + QuerySuccess queryResp = client.query(fql("Product.all().toStream()"), StreamTokenResponse.class); + StreamRequest request = StreamRequest.builder((queryResp.getData().getToken())).timeout(Duration.ofMillis(1)).build(); + ClientException exc = assertThrows(ClientException.class, () -> client.stream(request, Product.class)); + assertEquals(ExecutionException.class, exc.getCause().getClass()); + assertEquals(HttpTimeoutException.class, exc.getCause().getCause().getClass()); + + + } + @Disabled("Will fix this for GA, I think the other drivers have this bug too.") @Test public void handleLargeEvents() throws InterruptedException { diff --git a/src/test/java/com/fauna/stream/StreamRequestTest.java b/src/test/java/com/fauna/stream/StreamRequestTest.java index 6e384d3a..249aa92a 100644 --- a/src/test/java/com/fauna/stream/StreamRequestTest.java +++ b/src/test/java/com/fauna/stream/StreamRequestTest.java @@ -9,6 +9,7 @@ import java.io.IOException; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class StreamRequestTest { @@ -16,7 +17,7 @@ public class StreamRequestTest { @Test public void testTokenOnlyRequest() { - StreamRequest req = new StreamRequest("abc"); + StreamRequest req = StreamRequest.builder("abc").build(); assertEquals("abc", req.getToken()); assertTrue(req.getCursor().isEmpty()); assertTrue(req.getStartTs().isEmpty()); @@ -24,7 +25,7 @@ public void testTokenOnlyRequest() { @Test public void testCursorRequest() { - StreamRequest req = new StreamRequest("abc", "def"); + StreamRequest req = StreamRequest.builder("abc").cursor("def").build(); assertEquals("abc", req.getToken()); assertEquals("def", req.getCursor().get()); assertTrue(req.getStartTs().isEmpty()); @@ -32,10 +33,16 @@ public void testCursorRequest() { @Test public void testTsRequest() { - StreamRequest req = new StreamRequest("abc", 1234L); + StreamRequest req = StreamRequest.builder("abc").startTs(1234L).build(); assertEquals("abc", req.getToken()); assertTrue(req.getCursor().isEmpty()); assertEquals(1234L, req.getStartTs().get()); } + @Test + public void testCursorAndTsRequest() { + assertThrows(IllegalArgumentException.class, () -> StreamRequest.builder("tkn").startTs(10L).cursor("hello")); + assertThrows(IllegalArgumentException.class, () -> StreamRequest.builder("tkn").cursor("hello").startTs(10L)); + } + }