From 7be807fb509fd714c61d1d9f2ffd90022e114a6e Mon Sep 17 00:00:00 2001 From: David Griffin Date: Wed, 9 Oct 2024 16:05:11 -0700 Subject: [PATCH] Allow user-configurable HTTP timeout for query and stream requests. --- .../java/com/fauna/client/RequestBuilder.java | 15 ++- .../java/com/fauna/stream/StreamRequest.java | 102 ++++++++++++++---- .../com/fauna/client/RequestBuilderTest.java | 6 +- src/test/java/com/fauna/e2e/E2EQueryTest.java | 11 ++ .../java/com/fauna/e2e/E2EStreamingTest.java | 18 +++- .../com/fauna/stream/StreamRequestTest.java | 13 ++- 6 files changed, 135 insertions(+), 30 deletions(-) diff --git a/src/main/java/com/fauna/client/RequestBuilder.java b/src/main/java/com/fauna/client/RequestBuilder.java index f731e64c..86de8f0d 100644 --- a/src/main/java/com/fauna/client/RequestBuilder.java +++ b/src/main/java/com/fauna/client/RequestBuilder.java @@ -17,6 +17,7 @@ import java.net.http.HttpRequest; import java.nio.charset.StandardCharsets; import java.text.MessageFormat; +import java.time.Duration; import java.util.logging.Logger; import static com.fauna.client.Logging.headersAsString; @@ -30,6 +31,8 @@ public class RequestBuilder { private static final String QUERY_PATH = "/query/1"; private static final String STREAM_PATH = "/stream/1"; + private static final int TIMEOUT_BUFFER_MS = 1000; + private final HttpRequest.Builder baseRequestBuilder; private final Logger logger; @@ -92,8 +95,10 @@ public RequestBuilder scopedRequestBuilder(String token) { } private void logRequest(String body, HttpRequest req) { - logger.fine(MessageFormat.format("Fauna HTTP {0} Request to {1} (timeout {2}), headers: {3}", - req.method(), req.uri(), req.timeout(), headersAsString(req.headers()))); + String timeout = req.timeout().map( + val -> MessageFormat.format(" (timeout: {0})", val)).orElse(""); + logger.fine(MessageFormat.format("Fauna HTTP {0} Request to {1}{2}, headers: {3}", + req.method(), req.uri(), timeout, headersAsString(req.headers()))); logger.finest("Request body: " + body); } @@ -139,6 +144,7 @@ public String buildStreamRequestBody(StreamRequest request) throws IOException { public HttpRequest buildStreamRequest(StreamRequest request) { HttpRequest.Builder builder = baseRequestBuilder.copy(); + request.getTimeout().ifPresent(builder::timeout); try { String body = buildStreamRequestBody(request); HttpRequest req = builder.POST(HttpRequest.BodyPublishers.ofString(body)).build(); @@ -166,7 +172,10 @@ private HttpRequest.Builder getBuilder(QueryOptions options, Long lastTxnTs) { builder.setHeader(Headers.LAST_TXN_TS, String.valueOf(lastTxnTs)); } if (options != null) { - options.getTimeoutMillis().ifPresent(val -> builder.header(Headers.QUERY_TIMEOUT_MS, String.valueOf(val))); + options.getTimeoutMillis().ifPresent(val -> { + builder.timeout(Duration.ofMillis(val + TIMEOUT_BUFFER_MS)); + builder.header(Headers.QUERY_TIMEOUT_MS, String.valueOf(val)); + }); options.getLinearized().ifPresent(val -> builder.header(Headers.LINEARIZED, String.valueOf(val))); options.getTypeCheck().ifPresent(val -> builder.header(Headers.TYPE_CHECK, String.valueOf(val))); options.getTraceParent().ifPresent(val -> builder.header(Headers.TRACE_PARENT, val)); diff --git a/src/main/java/com/fauna/stream/StreamRequest.java b/src/main/java/com/fauna/stream/StreamRequest.java index 0890f67f..b691773d 100644 --- a/src/main/java/com/fauna/stream/StreamRequest.java +++ b/src/main/java/com/fauna/stream/StreamRequest.java @@ -2,6 +2,7 @@ import com.fauna.query.StreamTokenResponse; +import java.time.Duration; import java.util.Optional; /** @@ -10,37 +11,94 @@ public class StreamRequest { private final String token; private final String cursor; - private final Long start_ts; + private final Long startTs; + private final Duration timeout; - public StreamRequest(String token) { + StreamRequest(String token, String cursor, Long startTs, Duration timeout) { this.token = token; - this.cursor = null; - this.start_ts = null; + this.cursor = cursor; + this.startTs = startTs; + this.timeout = timeout; if (token == null || token.isEmpty()) { throw new IllegalArgumentException("token cannot be null or empty"); } + if (cursor != null && startTs != null) { + throw new IllegalArgumentException("Only one of cursor, or start_ts can be set."); + } } - public StreamRequest(String token, String cursor) { - this.token = token; - this.cursor = cursor; - this.start_ts = null; - if (token == null || token.isEmpty()) { - throw new IllegalArgumentException("token cannot be null or empty"); - } + public static StreamRequest fromTokenResponse(StreamTokenResponse tokenResponse) { + return new StreamRequest(tokenResponse.getToken(), null, null, null); } - public StreamRequest(String token, Long start_ts) { - this.token = token; - this.cursor = null; - this.start_ts = start_ts; - if (token == null || token.isEmpty()) { - throw new IllegalArgumentException("token cannot be null or empty"); + public static class Builder { + final String token; + String cursor = null; + Long startTs = null; + Duration timeout = null; + + /** + * Return a new StreamRequest.Builder instance with the given token. + * @param token A Fauna Stream token. + */ + public Builder(String token) { + this.token = token; } + + /** + * Return the current Builder instance with the given cursor. + * @param cursor A Fauna Stream cursor. + * @return The current Builder instance. + * @throws IllegalArgumentException If startTs has already been set. + */ + public Builder cursor(String cursor) { + if (this.startTs != null) { + throw new IllegalArgumentException("only one of cursor, or startTs can be set."); + } + this.cursor = cursor; + return this; + } + + /** + * Return the current Builder instance with the given start timestamp. + * @param startTs A timestamp to start the stream at. + * @return The current Builder instance. + * @throws IllegalArgumentException If startTs has already been set. + */ + public Builder startTs(Long startTs) { + if (this.cursor != null) { + throw new IllegalArgumentException("only one of cursor, or startTs can be set."); + } + this.startTs = startTs; + return this; + } + + /** + * Return the current builder instance with the given timeout. + * This timeout is the HTTP client timeout that is passed to java.net.http.HttpRequest.Builder. + * By testing, it seems that the client waits for the first byte, not the last byte of the response. + * Therefore, it's a timeout on the stream being opened, but the stream can stay open indefinitely. + * @param timeout A Duration representing the timeout. + * @return The current Builder instance. + */ + public Builder timeout(Duration timeout) { + this.timeout = timeout; + return this; + } + + public StreamRequest build() { + return new StreamRequest(token, cursor, startTs, timeout); + } + } - public static StreamRequest fromTokenResponse(StreamTokenResponse tokenResponse) { - return new StreamRequest(tokenResponse.getToken()); + /** + * Create a new StreamRequest.Builder instance. + * @param token The Fauna Stream token to use. + * @return A new StreamRequest.Builder instance. + */ + public static Builder builder(String token) { + return new Builder(token); } public String getToken() { @@ -52,6 +110,10 @@ public Optional getCursor() { } public Optional getStartTs() { - return Optional.ofNullable(start_ts); + return Optional.ofNullable(startTs); + } + + public Optional getTimeout() { + return Optional.ofNullable(timeout); } } diff --git a/src/test/java/com/fauna/client/RequestBuilderTest.java b/src/test/java/com/fauna/client/RequestBuilderTest.java index 5b8ccc02..6425ce79 100644 --- a/src/test/java/com/fauna/client/RequestBuilderTest.java +++ b/src/test/java/com/fauna/client/RequestBuilderTest.java @@ -81,7 +81,7 @@ void buildRequest_shouldIncludeOptionalHeadersWhenPresent() { @Test void buildStreamRequestBody_shouldOnlyIncludeToken() throws IOException { // Given - StreamRequest request = new StreamRequest("tkn"); + StreamRequest request = StreamRequest.builder("tkn").build(); // When String body = requestBuilder.buildStreamRequestBody(request); // Then @@ -91,7 +91,7 @@ void buildStreamRequestBody_shouldOnlyIncludeToken() throws IOException { @Test void buildStreamRequestBody_shouldIncludeCursor() throws IOException { // Given - StreamRequest request = new StreamRequest("tkn", "cur"); + StreamRequest request = StreamRequest.builder("tkn").cursor("cur").build(); // When String body = requestBuilder.buildStreamRequestBody(request); // Then @@ -101,7 +101,7 @@ void buildStreamRequestBody_shouldIncludeCursor() throws IOException { @Test void buildStreamRequestBody_shouldIncludeTimestamp() throws IOException { // Given - StreamRequest request = new StreamRequest("tkn", Long.MAX_VALUE / 2); + StreamRequest request = StreamRequest.builder("tkn").startTs(Long.MAX_VALUE / 2).build(); // When String body = requestBuilder.buildStreamRequestBody(request); // Then diff --git a/src/test/java/com/fauna/e2e/E2EQueryTest.java b/src/test/java/com/fauna/e2e/E2EQueryTest.java index 503f2d5b..02f1e326 100644 --- a/src/test/java/com/fauna/e2e/E2EQueryTest.java +++ b/src/test/java/com/fauna/e2e/E2EQueryTest.java @@ -4,7 +4,9 @@ import com.fauna.client.FaunaClient; import com.fauna.e2e.beans.Author; import com.fauna.exception.AbortException; +import com.fauna.exception.ClientException; import com.fauna.exception.QueryRuntimeException; +import com.fauna.exception.QueryTimeoutException; import com.fauna.query.QueryOptions; import com.fauna.query.builder.Query; import com.fauna.response.QuerySuccess; @@ -15,6 +17,7 @@ import org.junit.jupiter.api.Test; import java.io.IOException; +import java.time.Duration; import java.time.Instant; import java.util.HashMap; import java.util.List; @@ -68,6 +71,14 @@ public void clientTransactionTsOnFailure() { assertTrue(client.getLastTransactionTs().orElseThrow() > y2k); } + @Test + public void queryTimeout() { + QueryOptions opts = QueryOptions.builder().timeout(Duration.ofMillis(1)).build(); + QueryTimeoutException exc = assertThrows(QueryTimeoutException.class, + () -> c.query(fql("Author.byId('9090090')"), listOf(Author.class), opts)); + assertTrue(exc.getMessage().contains("Client set aggressive deadline")); + } + @Test public void query_syncWithClass() { var q = fql("42"); diff --git a/src/test/java/com/fauna/e2e/E2EStreamingTest.java b/src/test/java/com/fauna/e2e/E2EStreamingTest.java index 666d56d4..2ffd4ac3 100644 --- a/src/test/java/com/fauna/e2e/E2EStreamingTest.java +++ b/src/test/java/com/fauna/e2e/E2EStreamingTest.java @@ -4,6 +4,7 @@ import com.fauna.client.FaunaClient; import com.fauna.client.FaunaStream; import com.fauna.e2e.beans.Product; +import com.fauna.exception.ClientException; import com.fauna.query.StreamTokenResponse; import com.fauna.query.builder.Query; import com.fauna.response.QuerySuccess; @@ -13,13 +14,16 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import java.net.http.HttpTimeoutException; import java.text.MessageFormat; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -29,6 +33,7 @@ import static com.fauna.query.builder.Query.fql; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class E2EStreamingTest { @@ -140,7 +145,7 @@ public void handleStreamError() throws InterruptedException { // It would be nice to have another test that generates a stream with normal events, and then an error // event, but this at least tests some of the functionality. QuerySuccess queryResp = client.query(fql("Product.all().toStream()"), StreamTokenResponse.class); - StreamRequest request = new StreamRequest(queryResp.getData().getToken(), "invalid_cursor"); + StreamRequest request = StreamRequest.builder((queryResp.getData().getToken())).cursor("invalid_cursor").build(); FaunaStream stream = client.stream(request, Product.class); InventorySubscriber inventory = new InventorySubscriber(); stream.subscribe(inventory); @@ -151,6 +156,17 @@ public void handleStreamError() throws InterruptedException { assertTrue(stream.isClosed()); } + @Test + public void handleStreamTimeout() { + QuerySuccess queryResp = client.query(fql("Product.all().toStream()"), StreamTokenResponse.class); + StreamRequest request = StreamRequest.builder((queryResp.getData().getToken())).timeout(Duration.ofMillis(1)).build(); + ClientException exc = assertThrows(ClientException.class, () -> client.stream(request, Product.class)); + assertEquals(ExecutionException.class, exc.getCause().getClass()); + assertEquals(HttpTimeoutException.class, exc.getCause().getCause().getClass()); + + + } + @Disabled("Will fix this for GA, I think the other drivers have this bug too.") @Test public void handleLargeEvents() throws InterruptedException { diff --git a/src/test/java/com/fauna/stream/StreamRequestTest.java b/src/test/java/com/fauna/stream/StreamRequestTest.java index 6e384d3a..249aa92a 100644 --- a/src/test/java/com/fauna/stream/StreamRequestTest.java +++ b/src/test/java/com/fauna/stream/StreamRequestTest.java @@ -9,6 +9,7 @@ import java.io.IOException; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class StreamRequestTest { @@ -16,7 +17,7 @@ public class StreamRequestTest { @Test public void testTokenOnlyRequest() { - StreamRequest req = new StreamRequest("abc"); + StreamRequest req = StreamRequest.builder("abc").build(); assertEquals("abc", req.getToken()); assertTrue(req.getCursor().isEmpty()); assertTrue(req.getStartTs().isEmpty()); @@ -24,7 +25,7 @@ public void testTokenOnlyRequest() { @Test public void testCursorRequest() { - StreamRequest req = new StreamRequest("abc", "def"); + StreamRequest req = StreamRequest.builder("abc").cursor("def").build(); assertEquals("abc", req.getToken()); assertEquals("def", req.getCursor().get()); assertTrue(req.getStartTs().isEmpty()); @@ -32,10 +33,16 @@ public void testCursorRequest() { @Test public void testTsRequest() { - StreamRequest req = new StreamRequest("abc", 1234L); + StreamRequest req = StreamRequest.builder("abc").startTs(1234L).build(); assertEquals("abc", req.getToken()); assertTrue(req.getCursor().isEmpty()); assertEquals(1234L, req.getStartTs().get()); } + @Test + public void testCursorAndTsRequest() { + assertThrows(IllegalArgumentException.class, () -> StreamRequest.builder("tkn").startTs(10L).cursor("hello")); + assertThrows(IllegalArgumentException.class, () -> StreamRequest.builder("tkn").cursor("hello").startTs(10L)); + } + }