Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BT-5174-http-timeout] Allow user-configurable HTTP timeout for query and stream requests. #153

Merged
merged 7 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/main/java/com/fauna/client/RequestBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.net.http.HttpRequest;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.logging.Logger;

import static com.fauna.client.Logging.headersAsString;
Expand All @@ -30,6 +31,8 @@ public class RequestBuilder {
private static final String QUERY_PATH = "/query/1";
private static final String STREAM_PATH = "/stream/1";

private static final int TIMEOUT_BUFFER_MS = 1000;
findgriffin marked this conversation as resolved.
Show resolved Hide resolved

findgriffin marked this conversation as resolved.
Show resolved Hide resolved
private final HttpRequest.Builder baseRequestBuilder;
private final Logger logger;

Expand Down Expand Up @@ -92,8 +95,10 @@ public RequestBuilder scopedRequestBuilder(String token) {
}

private void logRequest(String body, HttpRequest req) {
logger.fine(MessageFormat.format("Fauna HTTP {0} Request to {1} (timeout {2}), headers: {3}",
req.method(), req.uri(), req.timeout(), headersAsString(req.headers())));
String timeout = req.timeout().map(
val -> MessageFormat.format(" (timeout: {0})", val)).orElse("");
logger.fine(MessageFormat.format("Fauna HTTP {0} Request to {1}{2}, headers: {3}",
req.method(), req.uri(), timeout, headersAsString(req.headers())));
logger.finest("Request body: " + body);
}

Expand Down Expand Up @@ -139,6 +144,7 @@ public String buildStreamRequestBody(StreamRequest request) throws IOException {

public HttpRequest buildStreamRequest(StreamRequest request) {
HttpRequest.Builder builder = baseRequestBuilder.copy();
request.getTimeout().ifPresent(builder::timeout);
try {
String body = buildStreamRequestBody(request);
HttpRequest req = builder.POST(HttpRequest.BodyPublishers.ofString(body)).build();
Expand Down Expand Up @@ -166,7 +172,10 @@ private HttpRequest.Builder getBuilder(QueryOptions options, Long lastTxnTs) {
builder.setHeader(Headers.LAST_TXN_TS, String.valueOf(lastTxnTs));
}
if (options != null) {
options.getTimeoutMillis().ifPresent(val -> builder.header(Headers.QUERY_TIMEOUT_MS, String.valueOf(val)));
options.getTimeoutMillis().ifPresent(val -> {
builder.timeout(Duration.ofMillis(val + TIMEOUT_BUFFER_MS));
builder.header(Headers.QUERY_TIMEOUT_MS, String.valueOf(val));
});
options.getLinearized().ifPresent(val -> builder.header(Headers.LINEARIZED, String.valueOf(val)));
options.getTypeCheck().ifPresent(val -> builder.header(Headers.TYPE_CHECK, String.valueOf(val)));
options.getTraceParent().ifPresent(val -> builder.header(Headers.TRACE_PARENT, val));
Expand Down
102 changes: 82 additions & 20 deletions src/main/java/com/fauna/stream/StreamRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fauna.query.StreamTokenResponse;

import java.time.Duration;
import java.util.Optional;

/**
Expand All @@ -10,37 +11,94 @@
public class StreamRequest {
private final String token;
private final String cursor;
private final Long start_ts;
private final Long startTs;
private final Duration timeout;

public StreamRequest(String token) {
StreamRequest(String token, String cursor, Long startTs, Duration timeout) {
this.token = token;
this.cursor = null;
this.start_ts = null;
this.cursor = cursor;
this.startTs = startTs;
this.timeout = timeout;
if (token == null || token.isEmpty()) {
throw new IllegalArgumentException("token cannot be null or empty");
}
if (cursor != null && startTs != null) {
throw new IllegalArgumentException("Only one of cursor, or start_ts can be set.");
}
}

public StreamRequest(String token, String cursor) {
this.token = token;
this.cursor = cursor;
this.start_ts = null;
if (token == null || token.isEmpty()) {
throw new IllegalArgumentException("token cannot be null or empty");
}
public static StreamRequest fromTokenResponse(StreamTokenResponse tokenResponse) {
return new StreamRequest(tokenResponse.getToken(), null, null, null);
}

public StreamRequest(String token, Long start_ts) {
this.token = token;
this.cursor = null;
this.start_ts = start_ts;
if (token == null || token.isEmpty()) {
throw new IllegalArgumentException("token cannot be null or empty");
public static class Builder {
final String token;
String cursor = null;
Long startTs = null;
Duration timeout = null;

/**
* Return a new StreamRequest.Builder instance with the given token.
* @param token A Fauna Stream token.
*/
public Builder(String token) {
this.token = token;
}

/**
* Return the current Builder instance with the given cursor.
* @param cursor A Fauna Stream cursor.
* @return The current Builder instance.
* @throws IllegalArgumentException If startTs has already been set.
*/
public Builder cursor(String cursor) {
if (this.startTs != null) {
throw new IllegalArgumentException("only one of cursor, or startTs can be set.");
}
this.cursor = cursor;
return this;
}

/**
* Return the current Builder instance with the given start timestamp.
* @param startTs A timestamp to start the stream at.
* @return The current Builder instance.
* @throws IllegalArgumentException If startTs has already been set.
*/
public Builder startTs(Long startTs) {
if (this.cursor != null) {
throw new IllegalArgumentException("only one of cursor, or startTs can be set.");
}
this.startTs = startTs;
return this;
}

/**
* Return the current builder instance with the given timeout.
* This timeout is the HTTP client timeout that is passed to java.net.http.HttpRequest.Builder.
* By testing, it seems that the client waits for the first byte, not the last byte of the response.
* Therefore, it's a timeout on the stream being opened, but the stream can stay open indefinitely.
findgriffin marked this conversation as resolved.
Show resolved Hide resolved
* @param timeout A Duration representing the timeout.
* @return The current Builder instance.
*/
public Builder timeout(Duration timeout) {
this.timeout = timeout;
return this;
}

public StreamRequest build() {
return new StreamRequest(token, cursor, startTs, timeout);
}

}

public static StreamRequest fromTokenResponse(StreamTokenResponse tokenResponse) {
return new StreamRequest(tokenResponse.getToken());
/**
* Create a new StreamRequest.Builder instance.
* @param token The Fauna Stream token to use.
* @return A new StreamRequest.Builder instance.
*/
public static Builder builder(String token) {
return new Builder(token);
}

public String getToken() {
Expand All @@ -52,6 +110,10 @@ public Optional<String> getCursor() {
}

public Optional<Long> getStartTs() {
return Optional.ofNullable(start_ts);
return Optional.ofNullable(startTs);
}

public Optional<Duration> getTimeout() {
return Optional.ofNullable(timeout);
}
}
6 changes: 3 additions & 3 deletions src/test/java/com/fauna/client/RequestBuilderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void buildRequest_shouldIncludeOptionalHeadersWhenPresent() {
@Test
void buildStreamRequestBody_shouldOnlyIncludeToken() throws IOException {
// Given
StreamRequest request = new StreamRequest("tkn");
StreamRequest request = StreamRequest.builder("tkn").build();
// When
String body = requestBuilder.buildStreamRequestBody(request);
// Then
Expand All @@ -91,7 +91,7 @@ void buildStreamRequestBody_shouldOnlyIncludeToken() throws IOException {
@Test
void buildStreamRequestBody_shouldIncludeCursor() throws IOException {
// Given
StreamRequest request = new StreamRequest("tkn", "cur");
StreamRequest request = StreamRequest.builder("tkn").cursor("cur").build();
// When
String body = requestBuilder.buildStreamRequestBody(request);
// Then
Expand All @@ -101,7 +101,7 @@ void buildStreamRequestBody_shouldIncludeCursor() throws IOException {
@Test
void buildStreamRequestBody_shouldIncludeTimestamp() throws IOException {
// Given
StreamRequest request = new StreamRequest("tkn", Long.MAX_VALUE / 2);
StreamRequest request = StreamRequest.builder("tkn").startTs(Long.MAX_VALUE / 2).build();
// When
String body = requestBuilder.buildStreamRequestBody(request);
// Then
Expand Down
11 changes: 11 additions & 0 deletions src/test/java/com/fauna/e2e/E2EQueryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import com.fauna.client.FaunaClient;
import com.fauna.e2e.beans.Author;
import com.fauna.exception.AbortException;
import com.fauna.exception.ClientException;
import com.fauna.exception.QueryRuntimeException;
import com.fauna.exception.QueryTimeoutException;
import com.fauna.query.QueryOptions;
import com.fauna.query.builder.Query;
import com.fauna.response.QuerySuccess;
Expand All @@ -15,6 +17,7 @@
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -68,6 +71,14 @@ public void clientTransactionTsOnFailure() {
assertTrue(client.getLastTransactionTs().orElseThrow() > y2k);
}

@Test
public void queryTimeout() {
QueryOptions opts = QueryOptions.builder().timeout(Duration.ofMillis(1)).build();
QueryTimeoutException exc = assertThrows(QueryTimeoutException.class,
() -> c.query(fql("Author.byId('9090090')"), listOf(Author.class), opts));
assertTrue(exc.getMessage().contains("Client set aggressive deadline"));
}

@Test
public void query_syncWithClass() {
var q = fql("42");
Expand Down
18 changes: 17 additions & 1 deletion src/test/java/com/fauna/e2e/E2EStreamingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fauna.client.FaunaClient;
import com.fauna.client.FaunaStream;
import com.fauna.e2e.beans.Product;
import com.fauna.exception.ClientException;
import com.fauna.query.StreamTokenResponse;
import com.fauna.query.builder.Query;
import com.fauna.response.QuerySuccess;
Expand All @@ -13,13 +14,16 @@
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.net.http.HttpTimeoutException;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -29,6 +33,7 @@
import static com.fauna.query.builder.Query.fql;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class E2EStreamingTest {
Expand Down Expand Up @@ -140,7 +145,7 @@ public void handleStreamError() throws InterruptedException {
// It would be nice to have another test that generates a stream with normal events, and then an error
// event, but this at least tests some of the functionality.
QuerySuccess<StreamTokenResponse> queryResp = client.query(fql("Product.all().toStream()"), StreamTokenResponse.class);
StreamRequest request = new StreamRequest(queryResp.getData().getToken(), "invalid_cursor");
StreamRequest request = StreamRequest.builder((queryResp.getData().getToken())).cursor("invalid_cursor").build();
FaunaStream stream = client.stream(request, Product.class);
InventorySubscriber inventory = new InventorySubscriber();
stream.subscribe(inventory);
Expand All @@ -151,6 +156,17 @@ public void handleStreamError() throws InterruptedException {
assertTrue(stream.isClosed());
}

@Test
public void handleStreamTimeout() {
QuerySuccess<StreamTokenResponse> queryResp = client.query(fql("Product.all().toStream()"), StreamTokenResponse.class);
StreamRequest request = StreamRequest.builder((queryResp.getData().getToken())).timeout(Duration.ofMillis(1)).build();
ClientException exc = assertThrows(ClientException.class, () -> client.stream(request, Product.class));
assertEquals(ExecutionException.class, exc.getCause().getClass());
assertEquals(HttpTimeoutException.class, exc.getCause().getCause().getClass());


}

@Disabled("Will fix this for GA, I think the other drivers have this bug too.")
@Test
public void handleLargeEvents() throws InterruptedException {
Expand Down
13 changes: 10 additions & 3 deletions src/test/java/com/fauna/stream/StreamRequestTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,40 @@
import java.io.IOException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class StreamRequestTest {
public static final CodecProvider provider = DefaultCodecProvider.SINGLETON;

@Test
public void testTokenOnlyRequest() {
StreamRequest req = new StreamRequest("abc");
StreamRequest req = StreamRequest.builder("abc").build();
assertEquals("abc", req.getToken());
assertTrue(req.getCursor().isEmpty());
assertTrue(req.getStartTs().isEmpty());
}

@Test
public void testCursorRequest() {
StreamRequest req = new StreamRequest("abc", "def");
StreamRequest req = StreamRequest.builder("abc").cursor("def").build();
assertEquals("abc", req.getToken());
assertEquals("def", req.getCursor().get());
assertTrue(req.getStartTs().isEmpty());
}

@Test
public void testTsRequest() {
StreamRequest req = new StreamRequest("abc", 1234L);
StreamRequest req = StreamRequest.builder("abc").startTs(1234L).build();
assertEquals("abc", req.getToken());
assertTrue(req.getCursor().isEmpty());
assertEquals(1234L, req.getStartTs().get());
}

@Test
public void testCursorAndTsRequest() {
assertThrows(IllegalArgumentException.class, () -> StreamRequest.builder("tkn").startTs(10L).cursor("hello"));
assertThrows(IllegalArgumentException.class, () -> StreamRequest.builder("tkn").cursor("hello").startTs(10L));
}

}
Loading