Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BT-5174-http-timeout] Allow user-configurable HTTP timeout for query and stream requests. #153

Merged
merged 7 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions src/main/java/com/fauna/client/FaunaConfig.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.fauna.client;

import java.time.Duration;
import java.util.Optional;
import java.util.logging.ConsoleHandler;
import java.util.logging.Handler;
import java.util.logging.Level;

/**
* FaunaConfig is a configuration class used to set up and configure a connection to Fauna.
* It encapsulates various settings such as the endpoint URL, secret key, query timeout, and others.
* It encapsulates various settings such as the endpoint URL, secret key, and more.
*/
public class FaunaConfig {

Expand All @@ -19,6 +20,7 @@ public static class FaunaEndpoint {
private final String endpoint;
private final String secret;
private final int maxContentionRetries;
private final Duration clientTimeoutBuffer;
private final Handler logHandler;
public static final FaunaConfig DEFAULT = FaunaConfig.builder().build();
public static final FaunaConfig LOCAL = FaunaConfig.builder().endpoint(
Expand All @@ -33,6 +35,7 @@ private FaunaConfig(Builder builder) {
this.endpoint = builder.endpoint != null ? builder.endpoint : FaunaEndpoint.DEFAULT;
this.secret = builder.secret != null ? builder.secret : "";
this.maxContentionRetries = builder.maxContentionRetries;
this.clientTimeoutBuffer = builder.clientTimeoutBuffer;
this.logHandler = builder.logHandler;
}

Expand Down Expand Up @@ -63,6 +66,14 @@ public int getMaxContentionRetries() {
return maxContentionRetries;
}

/**
* Gets the buffer that will be added to the HTTP client timeout, in addition to any query timeout.
* @return The timeout buffer Duration.
*/
public Duration getClientTimeoutBuffer() {
return clientTimeoutBuffer;
}

/**
* Gets the log handler that the client will use.
* @return A log handler instance.
Expand All @@ -71,7 +82,6 @@ public Handler getLogHandler() {
return logHandler;
}


/**
* Creates a new builder for FaunaConfig.
*
Expand All @@ -88,6 +98,7 @@ public static class Builder {
private String endpoint = FaunaEnvironment.faunaEndpoint().orElse(FaunaEndpoint.DEFAULT);
private String secret = FaunaEnvironment.faunaSecret().orElse("");
private int maxContentionRetries = 3;
private Duration clientTimeoutBuffer = Duration.ofSeconds(5);
private Handler logHandler = defaultLogHandler();

static Level getLogLevel(String debug) {
Expand Down Expand Up @@ -141,6 +152,14 @@ public Builder maxContentionRetries(int maxContentionRetries) {
return this;
}

/**
* Set the client timeout buffer.
*/
public Builder clientTimeoutBuffer(Duration duration) {
this.clientTimeoutBuffer = duration;
return this;
}

/**
* Override the default log handler with the given log handler.
* @param handler A log handler instance.
Expand All @@ -151,7 +170,7 @@ public Builder logHandler(Handler handler) {
return this;
}

/**
/**
* Builds and returns a new FaunaConfig instance.
*
* @return A new instance of FaunaConfig.
Expand Down
27 changes: 19 additions & 8 deletions src/main/java/com/fauna/client/RequestBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.net.http.HttpRequest;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.logging.Logger;

import static com.fauna.client.Logging.headersAsString;
Expand All @@ -31,6 +32,7 @@ public class RequestBuilder {
private static final String STREAM_PATH = "/stream/1";

private final HttpRequest.Builder baseRequestBuilder;
private final Duration clientTimeoutBuffer;
private final Logger logger;

static class FieldNames {
Expand All @@ -56,7 +58,7 @@ static class Headers {
static final String FORMAT = "X-Format";
}

public RequestBuilder(URI uri, String token, int maxContentionRetries, Logger logger) {
public RequestBuilder(URI uri, String token, int maxContentionRetries, Duration clientTimeoutBuffer, Logger logger) {
// DriverEnvironment is not needed outside the constructor for now.
DriverEnvironment env = new DriverEnvironment(DriverEnvironment.JvmDriver.JAVA);
this.baseRequestBuilder = HttpRequest.newBuilder().uri(uri).headers(
Expand All @@ -68,32 +70,36 @@ public RequestBuilder(URI uri, String token, int maxContentionRetries, Logger lo
RequestBuilder.Headers.MAX_CONTENTION_RETRIES, String.valueOf(maxContentionRetries),
Headers.AUTHORIZATION, buildAuthHeader(token)
);
this.clientTimeoutBuffer = clientTimeoutBuffer;
this.logger = logger;
}

public RequestBuilder(HttpRequest.Builder builder, Logger logger) {
public RequestBuilder(HttpRequest.Builder builder, Duration clientTimeoutBuffer, Logger logger) {
this.baseRequestBuilder = builder;
this.clientTimeoutBuffer = clientTimeoutBuffer;
this.logger = logger;
}

public static RequestBuilder queryRequestBuilder(FaunaConfig config, Logger logger) {
return new RequestBuilder(URI.create(config.getEndpoint() + QUERY_PATH), config.getSecret(), config.getMaxContentionRetries(), logger);
return new RequestBuilder(URI.create(config.getEndpoint() + QUERY_PATH), config.getSecret(), config.getMaxContentionRetries(), config.getClientTimeoutBuffer(), logger);
}

public static RequestBuilder streamRequestBuilder(FaunaConfig config, Logger logger) {
return new RequestBuilder(URI.create(config.getEndpoint() + STREAM_PATH), config.getSecret(), config.getMaxContentionRetries(), logger);
return new RequestBuilder(URI.create(config.getEndpoint() + STREAM_PATH), config.getSecret(), config.getMaxContentionRetries(), config.getClientTimeoutBuffer(), logger);
}

public RequestBuilder scopedRequestBuilder(String token) {
HttpRequest.Builder newBuilder = this.baseRequestBuilder.copy();
// .setHeader(..) clears existing headers (which we want) while .header(..) would append it :)
newBuilder.setHeader(Headers.AUTHORIZATION, buildAuthHeader(token));
return new RequestBuilder(newBuilder, logger);
return new RequestBuilder(newBuilder, clientTimeoutBuffer, logger);
}

private void logRequest(String body, HttpRequest req) {
logger.fine(MessageFormat.format("Fauna HTTP {0} Request to {1} (timeout {2}), headers: {3}",
req.method(), req.uri(), req.timeout(), headersAsString(req.headers())));
String timeout = req.timeout().map(
val -> MessageFormat.format(" (timeout: {0})", val)).orElse("");
logger.fine(MessageFormat.format("Fauna HTTP {0} Request to {1}{2}, headers: {3}",
req.method(), req.uri(), timeout, headersAsString(req.headers())));
logger.finest("Request body: " + body);
}

Expand Down Expand Up @@ -139,6 +145,7 @@ public String buildStreamRequestBody(StreamRequest request) throws IOException {

public HttpRequest buildStreamRequest(StreamRequest request) {
HttpRequest.Builder builder = baseRequestBuilder.copy();
request.getTimeout().ifPresent(builder::timeout);
try {
String body = buildStreamRequestBody(request);
HttpRequest req = builder.POST(HttpRequest.BodyPublishers.ofString(body)).build();
Expand Down Expand Up @@ -166,7 +173,11 @@ private HttpRequest.Builder getBuilder(QueryOptions options, Long lastTxnTs) {
builder.setHeader(Headers.LAST_TXN_TS, String.valueOf(lastTxnTs));
}
if (options != null) {
options.getTimeoutMillis().ifPresent(val -> builder.header(Headers.QUERY_TIMEOUT_MS, String.valueOf(val)));

options.getTimeoutMillis().ifPresent(val -> {
builder.timeout(Duration.ofMillis(val).plus(clientTimeoutBuffer));
builder.header(Headers.QUERY_TIMEOUT_MS, String.valueOf(val));
});
options.getLinearized().ifPresent(val -> builder.header(Headers.LINEARIZED, String.valueOf(val)));
options.getTypeCheck().ifPresent(val -> builder.header(Headers.TYPE_CHECK, String.valueOf(val)));
options.getTraceParent().ifPresent(val -> builder.header(Headers.TRACE_PARENT, val));
Expand Down
124 changes: 104 additions & 20 deletions src/main/java/com/fauna/stream/StreamRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fauna.query.StreamTokenResponse;

import java.time.Duration;
import java.util.Optional;

/**
Expand All @@ -10,48 +11,131 @@
public class StreamRequest {
private final String token;
private final String cursor;
private final Long start_ts;
private final Long startTs;
private final Duration timeout;

public StreamRequest(String token) {
StreamRequest(String token, String cursor, Long startTs, Duration timeout) {
this.token = token;
this.cursor = null;
this.start_ts = null;
this.cursor = cursor;
this.startTs = startTs;
this.timeout = timeout;
if (token == null || token.isEmpty()) {
throw new IllegalArgumentException("token cannot be null or empty");
}
if (cursor != null && startTs != null) {
throw new IllegalArgumentException("Only one of cursor, or start_ts can be set.");
}
}

public StreamRequest(String token, String cursor) {
this.token = token;
this.cursor = cursor;
this.start_ts = null;
if (token == null || token.isEmpty()) {
throw new IllegalArgumentException("token cannot be null or empty");
}
public static StreamRequest fromTokenResponse(StreamTokenResponse tokenResponse) {
return new StreamRequest(tokenResponse.getToken(), null, null, null);
}

public StreamRequest(String token, Long start_ts) {
this.token = token;
this.cursor = null;
this.start_ts = start_ts;
if (token == null || token.isEmpty()) {
throw new IllegalArgumentException("token cannot be null or empty");
public static class Builder {
final String token;
String cursor = null;
Long startTs = null;
Duration timeout = null;

/**
* Return a new StreamRequest.Builder instance with the given token.
* @param token A Fauna Stream token.
*/
public Builder(String token) {
this.token = token;
}

/**
* Return the current Builder instance with the given cursor.
* @param cursor A Fauna Stream cursor.
* @return The current Builder instance.
* @throws IllegalArgumentException If startTs has already been set.
*/
public Builder cursor(String cursor) {
if (this.startTs != null) {
throw new IllegalArgumentException("only one of cursor, or startTs can be set.");
}
this.cursor = cursor;
return this;
}

/**
* Return the current Builder instance with the given start timestamp.
* @param startTs A timestamp to start the stream at.
* @return The current Builder instance.
* @throws IllegalArgumentException If startTs has already been set.
*/
public Builder startTs(Long startTs) {
if (this.cursor != null) {
throw new IllegalArgumentException("only one of cursor, or startTs can be set.");
}
this.startTs = startTs;
return this;
}

/**
* Return the current builder instance with the given timeout.
* This timeout is the HTTP client timeout that is passed to java.net.http.HttpRequest.Builder.
* The Java documentation says that if "the response is not received within the specified timeout then an
* HttpTimeoutException is thrown". For streaming this means that the exception is thrown if the first
* headers/bytes are not recieved within the timeout.
*
* The default value is null if the user does not set this timeout.
* @param timeout A Duration representing the timeout.
* @return The current Builder instance.
*/
public Builder timeout(Duration timeout) {
this.timeout = timeout;
return this;
}

public StreamRequest build() {
return new StreamRequest(token, cursor, startTs, timeout);
}

}

public static StreamRequest fromTokenResponse(StreamTokenResponse tokenResponse) {
return new StreamRequest(tokenResponse.getToken());
/**
* Create a new StreamRequest.Builder instance.
* @param token The Fauna Stream token to use.
* @return A new StreamRequest.Builder instance.
*/
public static Builder builder(String token) {
return new Builder(token);
}

/**
* Stream token for the event stream to subscribe to.
* @return A String representing the Stream token.
*/
public String getToken() {
return token;
}

/**
* Cursor for a previous event. If provided, the stream replays any events that occurred after
* the cursor (exclusive).
* @return The cursor, or Optional.empty() if not provided.
*/
public Optional<String> getCursor() {
return Optional.ofNullable(cursor);
}

/**
* Stream start time in microseconds since the Unix epoch. This is typically a previous event's txn_ts
* (transaction timestamp).
* @return The stream start time, as a Long, or Optional.empty() if not provided.
*/
public Optional<Long> getStartTs() {
return Optional.ofNullable(start_ts);
return Optional.ofNullable(startTs);
}

/**
* Stream HTTP request timeout. This timeout is passed to java.net.http.HttpRequest.Builder. The default
* is null/empty.
* @return The timeout Duration, or Optional.empty() if not set.
*/
public Optional<Duration> getTimeout() {
return Optional.ofNullable(timeout);
}
}
5 changes: 4 additions & 1 deletion src/test/java/com/fauna/client/FaunaConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.time.Duration;
import java.util.logging.ConsoleHandler;
import java.util.logging.Level;

Expand Down Expand Up @@ -32,7 +33,9 @@ public void testOverridingDefaultFaunaConfig() {
.secret("foo")
.endpoint("endpoint")
.logHandler(handler)
.maxContentionRetries(1).build();
.maxContentionRetries(1)
.clientTimeoutBuffer(Duration.ofSeconds(1))
.build();
assertEquals("endpoint", config.getEndpoint());
assertEquals(Level.ALL, config.getLogHandler().getLevel());
assertEquals("foo", config.getSecret());
Expand Down
Loading
Loading