diff --git a/ingester-example/src/main/java/io/greptime/TestConnector.java b/ingester-example/src/main/java/io/greptime/TestConnector.java index 7f61dc6..ddc9ea3 100644 --- a/ingester-example/src/main/java/io/greptime/TestConnector.java +++ b/ingester-example/src/main/java/io/greptime/TestConnector.java @@ -74,6 +74,11 @@ public static GreptimeDB connectToDefaultDB() { // periodically. By default, the route tables will not be refreshed. .routeTableRefreshPeriodSeconds(-1) // Optional, the default value is fine. + // Timeout for health check, if the health check is not completed within the specified time, + // the health check will fail. + // The default is 1000 + .checkHealthTimeoutMs(1000) + // Optional, the default value is fine. // Sets the request router, The internal default implementation works well. // You don't need to set it unless you have special requirements. .router(null) diff --git a/ingester-protocol/src/main/java/io/greptime/RouterClient.java b/ingester-protocol/src/main/java/io/greptime/RouterClient.java index f39736e..769f08e 100644 --- a/ingester-protocol/src/main/java/io/greptime/RouterClient.java +++ b/ingester-protocol/src/main/java/io/greptime/RouterClient.java @@ -239,18 +239,24 @@ public String toString() { @Override public CompletableFuture> checkHealth() { - Map> futures = this.opts.getEndpoints().stream() - .collect(Collectors.toMap(Function.identity(), endpoint -> { - HealthCheckRequest req = HealthCheckRequest.newBuilder().build(); - return this.invoke(endpoint, req, Context.newDefault()) - .thenApply(resp -> true) - .exceptionally(t -> false); // Handle failure and return false - })); - - return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])) - .thenApply( - ok -> futures.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - .join()))); + Map> futures = + this.opts.getEndpoints().stream().collect(Collectors.toMap(Function.identity(), this::doCheckHealth)); + + CompletableFuture all = CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])); + + return all.thenApply(ok -> futures.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().join()))); + } + + private CompletableFuture doCheckHealth(Endpoint endpoint) { + HealthCheckRequest req = HealthCheckRequest.newBuilder().build(); + return invoke(endpoint, req, Context.newDefault(), this.opts.getCheckHealthTimeoutMs()) + .thenApply(resp -> true) + .exceptionally( + t -> { // Handle failure and return false + LOG.warn("Failed to check health for endpoint: {}", endpoint, t); + return false; + }); } /** diff --git a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java index 0d321ca..9356c39 100644 --- a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java +++ b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java @@ -39,6 +39,7 @@ public class GreptimeOptions implements Copiable { public static final int DEFAULT_MAX_IN_FLIGHT_WRITE_POINTS = 10 * 65536; public static final int DEFAULT_DEFAULT_STREAM_MAX_WRITE_POINTS_PER_SECOND = 10 * 65536; public static final long DEFAULT_ROUTE_TABLE_REFRESH_PERIOD_SECONDS = 10 * 60; + public static final long DEFAULT_CHECK_HEALTH_TIMEOUT_MS = 1000; private List endpoints; private RpcOptions rpcOptions; @@ -158,6 +159,8 @@ public static final class Builder { // Refresh frequency of route tables. The background refreshes all route tables periodically. // If the value is less than or equal to 0, the route tables will not be refreshed. private long routeTableRefreshPeriodSeconds = DEFAULT_ROUTE_TABLE_REFRESH_PERIOD_SECONDS; + // Timeout for health check + private long checkHealthTimeoutMs = DEFAULT_CHECK_HEALTH_TIMEOUT_MS; // Authentication information private AuthInfo authInfo; // The request router @@ -273,6 +276,19 @@ public Builder routeTableRefreshPeriodSeconds(long routeTableRefreshPeriodSecond return this; } + /** + * Timeout for health check. The default is 1000ms. + * If the health check is not completed within the specified time, the health + * check will fail. + * + * @param checkHealthTimeoutMs timeout for health check + * @return this builder + */ + public Builder checkHealthTimeoutMs(long checkHealthTimeoutMs) { + this.checkHealthTimeoutMs = checkHealthTimeoutMs; + return this; + } + /** * Sets authentication information. If the DB is not required to authenticate, * we can ignore this. @@ -321,6 +337,7 @@ private RouterOptions routerOptions() { routerOpts.setEndpoints(this.endpoints); routerOpts.setRouter(this.router); routerOpts.setRefreshPeriodSeconds(this.routeTableRefreshPeriodSeconds); + routerOpts.setCheckHealthTimeoutMs(this.checkHealthTimeoutMs); return routerOpts; } diff --git a/ingester-protocol/src/main/java/io/greptime/options/RouterOptions.java b/ingester-protocol/src/main/java/io/greptime/options/RouterOptions.java index d5e578d..2f7c75a 100644 --- a/ingester-protocol/src/main/java/io/greptime/options/RouterOptions.java +++ b/ingester-protocol/src/main/java/io/greptime/options/RouterOptions.java @@ -34,6 +34,8 @@ public class RouterOptions implements Copiable { // all route tables periodically. If the value is less than or // equal to 0, the route tables will not be refreshed. private long refreshPeriodSeconds = -1; + // Timeout for health check + private long checkHealthTimeoutMs; private Router router; public RpcClient getRpcClient() { @@ -60,6 +62,14 @@ public void setRefreshPeriodSeconds(long refreshPeriodSeconds) { this.refreshPeriodSeconds = refreshPeriodSeconds; } + public long getCheckHealthTimeoutMs() { + return checkHealthTimeoutMs; + } + + public void setCheckHealthTimeoutMs(long checkHealthTimeoutMs) { + this.checkHealthTimeoutMs = checkHealthTimeoutMs; + } + public Router getRouter() { return router; } @@ -74,15 +84,18 @@ public RouterOptions copy() { opts.rpcClient = rpcClient; opts.endpoints = this.endpoints; opts.refreshPeriodSeconds = this.refreshPeriodSeconds; + opts.checkHealthTimeoutMs = this.checkHealthTimeoutMs; opts.router = this.router; return opts; } @Override public String toString() { - return "RouterOptions{" + "endpoints=" + return "RouterOptions{" + "rpcClient=" + + rpcClient + ", endpoints=" + endpoints + ", refreshPeriodSeconds=" - + refreshPeriodSeconds + ", router=" + + refreshPeriodSeconds + ", checkHealthTimeoutMs=" + + checkHealthTimeoutMs + ", router=" + router + '}'; } } diff --git a/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java index fda491d..07efc9a 100644 --- a/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java +++ b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java @@ -44,6 +44,7 @@ public void testAllOptions() { LimitedPolicy limitedPolicy = new LimitedPolicy.DiscardPolicy(); int defaultStreamMaxWritePointsPerSecond = 100000; long routeTableRefreshPeriodSeconds = 99; + long checkHealthTimeoutMs = 1000; AuthInfo authInfo = new AuthInfo("user", "password"); Router router = createTestRouter(); TlsOptions tlsOptions = new TlsOptions(); @@ -57,6 +58,7 @@ public void testAllOptions() { .writeLimitedPolicy(limitedPolicy) .defaultStreamMaxWritePointsPerSecond(defaultStreamMaxWritePointsPerSecond) .routeTableRefreshPeriodSeconds(routeTableRefreshPeriodSeconds) + .checkHealthTimeoutMs(checkHealthTimeoutMs) .authInfo(authInfo) .router(router) .build(); @@ -74,6 +76,7 @@ public void testAllOptions() { routerOptions.getEndpoints().stream().map(Endpoint::toString).toArray()); Assert.assertEquals(router, routerOptions.getRouter()); Assert.assertEquals(routeTableRefreshPeriodSeconds, routerOptions.getRefreshPeriodSeconds()); + Assert.assertEquals(checkHealthTimeoutMs, routerOptions.getCheckHealthTimeoutMs()); WriteOptions writeOptions = opts.getWriteOptions(); Assert.assertNotNull(writeOptions);