From 680ce3cc0dee9adcd8f067eaa5d28ef06c7933e3 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 10 Sep 2024 20:32:33 +0800 Subject: [PATCH 1/8] feat: health check --- .../src/main/java/io/greptime/GreptimeDB.java | 7 ++++- .../main/java/io/greptime/HealthCheck.java | 29 +++++++++++++++++++ .../java/io/greptime/RpcServiceRegister.java | 29 +++++++++++++------ 3 files changed, 55 insertions(+), 10 deletions(-) create mode 100644 ingester-protocol/src/main/java/io/greptime/HealthCheck.java diff --git a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java index 5f7a527..83bdae6 100644 --- a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java +++ b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java @@ -53,7 +53,7 @@ /** * The GreptimeDB client. */ -public class GreptimeDB implements Write, WriteObject, Lifecycle, Display { +public class GreptimeDB implements Write, WriteObject, Lifecycle, HealthCheck, Display { private static final Logger LOG = LoggerFactory.getLogger(GreptimeDB.class); @@ -177,6 +177,11 @@ public StreamWriter streamWriter(int maxPointsPerSecond, Context return this.writeClient.streamWriter(maxPointsPerSecond, attachCtx(ctx)); } + @Override + public CompletableFuture is_alive() { + return null; + } + @Override public void display(Printer out) { out.println("--- GreptimeDB Client ---") diff --git a/ingester-protocol/src/main/java/io/greptime/HealthCheck.java b/ingester-protocol/src/main/java/io/greptime/HealthCheck.java new file mode 100644 index 0000000..04f48f4 --- /dev/null +++ b/ingester-protocol/src/main/java/io/greptime/HealthCheck.java @@ -0,0 +1,29 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.greptime; + +import java.util.concurrent.CompletableFuture; + +/** + * Health check. It just like to probe the database and connections. + * Users can use this status to perform fault tolerance and disaster recovery actions. + * + * @author jiachun.fjc + */ +public interface HealthCheck { + CompletableFuture is_alive(); +} diff --git a/ingester-protocol/src/main/java/io/greptime/RpcServiceRegister.java b/ingester-protocol/src/main/java/io/greptime/RpcServiceRegister.java index 6426578..f1c97e6 100644 --- a/ingester-protocol/src/main/java/io/greptime/RpcServiceRegister.java +++ b/ingester-protocol/src/main/java/io/greptime/RpcServiceRegister.java @@ -19,31 +19,42 @@ import io.greptime.rpc.MethodDescriptor; import io.greptime.rpc.RpcFactoryProvider; import io.greptime.v1.Database; +import io.greptime.v1.Health; /** * The RPC service register. */ public class RpcServiceRegister { - private static final String METHOD_TEMPLATE = "greptime.v1.GreptimeDatabase/%s"; + private static final String DATABASE_METHOD_TEMPLATE = "greptime.v1.GreptimeDatabase/%s"; + private static final String HEALTH_METHOD_TEMPLATE = "greptime.v1.Health/%s"; public static void registerAllService() { - // register protobuf serializer + // Handle + MethodDescriptor handleMethod = MethodDescriptor + .of(String.format(DATABASE_METHOD_TEMPLATE, "Handle"), MethodDescriptor.MethodType.UNARY, 1); RpcFactoryProvider.getRpcFactory() - .register( - MethodDescriptor.of( - String.format(METHOD_TEMPLATE, "Handle"), MethodDescriptor.MethodType.UNARY, 1), + .register(handleMethod, Database.GreptimeRequest.class, Database.GreptimeRequest.getDefaultInstance(), Database.GreptimeResponse.getDefaultInstance()); + // HandleRequests + MethodDescriptor handleRequestsMethod = MethodDescriptor + .of(String.format(DATABASE_METHOD_TEMPLATE, "HandleRequests"), MethodDescriptor.MethodType.CLIENT_STREAMING); RpcFactoryProvider.getRpcFactory() - .register( - MethodDescriptor.of( - String.format(METHOD_TEMPLATE, "HandleRequests"), - MethodDescriptor.MethodType.CLIENT_STREAMING), + .register(handleRequestsMethod, Database.GreptimeRequest.class, Database.GreptimeRequest.getDefaultInstance(), Database.GreptimeResponse.getDefaultInstance()); + + // HealthCheck + MethodDescriptor healthCheckMethod = MethodDescriptor.of( + String.format(HEALTH_METHOD_TEMPLATE, "HealthCheck"), MethodDescriptor.MethodType.UNARY); + RpcFactoryProvider.getRpcFactory() + .register(healthCheckMethod, + Health.HealthCheckRequest.class, + Health.HealthCheckRequest.getDefaultInstance(), + Health.HealthCheckResponse.getDefaultInstance()); } } From e2da25b3b817be71907d7dab02903e5c91552b9f Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Wed, 11 Sep 2024 10:35:50 +0800 Subject: [PATCH 2/8] feat: health check in RouteClient --- .../src/main/java/io/greptime/GreptimeDB.java | 6 +- .../{HealthCheck.java => Health.java} | 8 +- .../src/main/java/io/greptime/Router.java | 15 +-- .../main/java/io/greptime/RouterClient.java | 91 +++++++++++++++---- .../java/io/greptime/RpcServiceRegister.java | 20 ++-- .../src/main/java/io/greptime/Util.java | 14 +++ .../main/java/io/greptime/WriteClient.java | 8 +- .../greptime/options/GreptimeOptionsTest.java | 8 +- 8 files changed, 118 insertions(+), 52 deletions(-) rename ingester-protocol/src/main/java/io/greptime/{HealthCheck.java => Health.java} (85%) diff --git a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java index 83bdae6..adc6a38 100644 --- a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java +++ b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java @@ -53,7 +53,7 @@ /** * The GreptimeDB client. */ -public class GreptimeDB implements Write, WriteObject, Lifecycle, HealthCheck, Display { +public class GreptimeDB implements Write, WriteObject, Lifecycle, Health, Display { private static final Logger LOG = LoggerFactory.getLogger(GreptimeDB.class); @@ -178,8 +178,8 @@ public StreamWriter streamWriter(int maxPointsPerSecond, Context } @Override - public CompletableFuture is_alive() { - return null; + public CompletableFuture> checkHealth() { + return this.routerClient.checkHealth(); } @Override diff --git a/ingester-protocol/src/main/java/io/greptime/HealthCheck.java b/ingester-protocol/src/main/java/io/greptime/Health.java similarity index 85% rename from ingester-protocol/src/main/java/io/greptime/HealthCheck.java rename to ingester-protocol/src/main/java/io/greptime/Health.java index 04f48f4..5b93b77 100644 --- a/ingester-protocol/src/main/java/io/greptime/HealthCheck.java +++ b/ingester-protocol/src/main/java/io/greptime/Health.java @@ -16,14 +16,14 @@ package io.greptime; +import io.greptime.common.Endpoint; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** * Health check. It just like to probe the database and connections. * Users can use this status to perform fault tolerance and disaster recovery actions. - * - * @author jiachun.fjc */ -public interface HealthCheck { - CompletableFuture is_alive(); +public interface Health { + CompletableFuture> checkHealth(); } diff --git a/ingester-protocol/src/main/java/io/greptime/Router.java b/ingester-protocol/src/main/java/io/greptime/Router.java index 88b32af..6731148 100644 --- a/ingester-protocol/src/main/java/io/greptime/Router.java +++ b/ingester-protocol/src/main/java/io/greptime/Router.java @@ -33,17 +33,10 @@ public interface Router { CompletableFuture routeFor(R request); /** - * Refresh the routing table from remote server. - * @return a future that will be completed when the refresh is done - */ - CompletableFuture refresh(); - - /** - * Refresh the routing table. - * We need to get all the endpoints, and this method will overwrite all - * current endpoints. + * Refresh the routing table. By health checker or service discovery. * - * @param endpoints all new endpoints + * @param activities all activities endpoints + * @param inactivities all inactivities endpoints */ - void onRefresh(List endpoints); + void onRefresh(List activities, List inactivities); } diff --git a/ingester-protocol/src/main/java/io/greptime/RouterClient.java b/ingester-protocol/src/main/java/io/greptime/RouterClient.java index ed7756f..94eedb6 100644 --- a/ingester-protocol/src/main/java/io/greptime/RouterClient.java +++ b/ingester-protocol/src/main/java/io/greptime/RouterClient.java @@ -25,12 +25,17 @@ import io.greptime.rpc.Context; import io.greptime.rpc.Observer; import io.greptime.rpc.RpcClient; +import io.greptime.v1.Health.HealthCheckRequest; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +43,7 @@ * A route rpc client which cached the routing table information locally * and will auto refresh. */ -public class RouterClient implements Lifecycle, Display { +public class RouterClient implements Lifecycle, Health, Display { private static final Logger LOG = LoggerFactory.getLogger(RouterClient.class); @@ -57,19 +62,29 @@ public boolean init(RouterOptions opts) { List endpoints = Ensures.ensureNonNull(this.opts.getEndpoints(), "null `endpoints`"); this.router = new DefaultRouter(); - this.router.onRefresh(endpoints); + this.router.onRefresh(endpoints, null); long refreshPeriod = this.opts.getRefreshPeriodSeconds(); if (refreshPeriod > 0) { this.refresher = REFRESHER_POOL.getObject(); this.refresher.scheduleWithFixedDelay( - () -> this.router.refresh().whenComplete((r, e) -> { - if (e != null) { - LOG.error("Router cache refresh failed.", e); - } else { - LOG.debug("Router cache refresh {}.", r ? "success" : "failed"); + () -> { + try { + Map health = this.checkHealth().get(); + List activities = new ArrayList<>(); + List inactivities = new ArrayList<>(); + for (Map.Entry entry : health.entrySet()) { + if (entry.getValue()) { + activities.add(entry.getKey()); + } else { + inactivities.add(entry.getKey()); + } + } + this.router.onRefresh(activities, inactivities); + } catch (Throwable t) { + LOG.warn("Failed to check health", t); } - }), + }, Util.randomInitialDelay(180), refreshPeriod, TimeUnit.SECONDS); @@ -204,6 +219,22 @@ public String toString() { return "RouterClient{" + "refresher=" + refresher + ", opts=" + opts + ", rpcClient=" + rpcClient + '}'; } + @Override + public CompletableFuture> checkHealth() { + Map> futures = this.opts.getEndpoints().stream() + .collect(Collectors.toMap(Function.identity(), endpoint -> { + HealthCheckRequest req = HealthCheckRequest.newBuilder().build(); + return this.invoke(endpoint, req, Context.newDefault()) + .thenApply(resp -> true) + .exceptionally(t -> false); // Handle failure and return false + })); + + return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])) + .thenApply( + ok -> futures.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() + .join()))); + } + /** * Request to a `frontend` server, which needs to return all members(frontend server), * or it can return only one domain address, it is also possible to return no address @@ -219,25 +250,49 @@ public String toString() { */ private static class DefaultRouter implements Router { - private final AtomicReference> endpointsRef = new AtomicReference<>(); + private final AtomicReference endpointsRef = new AtomicReference<>(); @Override public CompletableFuture routeFor(Void request) { - List endpoints = this.endpointsRef.get(); + Endpoints endpoints = this.endpointsRef.get(); + + if (endpoints == null) { + return Util.errorCf(new IllegalStateException("null `endpoints`")); + } + ThreadLocalRandom random = ThreadLocalRandom.current(); - int i = random.nextInt(0, endpoints.size()); - return Util.completedCf(endpoints.get(i)); + + if (!endpoints.activities.isEmpty()) { + int i = random.nextInt(0, endpoints.activities.size()); + return Util.completedCf(endpoints.activities.get(i)); + } + + if (!endpoints.inactivities.isEmpty()) { + int i = random.nextInt(0, endpoints.inactivities.size()); + Endpoint goodLuck = endpoints.inactivities.get(i); + LOG.warn("No active endpoint, return an inactive one: {}", goodLuck); + return Util.completedCf(goodLuck); + } + + return Util.errorCf(new IllegalStateException("empty `endpoints`")); } @Override - public CompletableFuture refresh() { - // always return true - return Util.completedCf(true); + public void onRefresh(List activities, List inactivities) { + if (inactivities != null && !inactivities.isEmpty()) { + LOG.warn("Some endpoints are inactive: {}", inactivities); + } + this.endpointsRef.set(new Endpoints(activities, inactivities)); } + } - @Override - public void onRefresh(List endpoints) { - this.endpointsRef.set(endpoints); + static class Endpoints { + final List activities; + final List inactivities; + + Endpoints(List activities, List inactivities) { + this.activities = activities == null ? new ArrayList<>() : activities; + this.inactivities = inactivities == null ? new ArrayList<>() : inactivities; } } } diff --git a/ingester-protocol/src/main/java/io/greptime/RpcServiceRegister.java b/ingester-protocol/src/main/java/io/greptime/RpcServiceRegister.java index f1c97e6..052ead9 100644 --- a/ingester-protocol/src/main/java/io/greptime/RpcServiceRegister.java +++ b/ingester-protocol/src/main/java/io/greptime/RpcServiceRegister.java @@ -27,23 +27,26 @@ public class RpcServiceRegister { private static final String DATABASE_METHOD_TEMPLATE = "greptime.v1.GreptimeDatabase/%s"; - private static final String HEALTH_METHOD_TEMPLATE = "greptime.v1.Health/%s"; + private static final String HEALTH_METHOD_TEMPLATE = "greptime.v1.HealthCheck/%s"; public static void registerAllService() { // Handle - MethodDescriptor handleMethod = MethodDescriptor - .of(String.format(DATABASE_METHOD_TEMPLATE, "Handle"), MethodDescriptor.MethodType.UNARY, 1); + MethodDescriptor handleMethod = MethodDescriptor.of( + String.format(DATABASE_METHOD_TEMPLATE, "Handle"), MethodDescriptor.MethodType.UNARY, 1); RpcFactoryProvider.getRpcFactory() - .register(handleMethod, + .register( + handleMethod, Database.GreptimeRequest.class, Database.GreptimeRequest.getDefaultInstance(), Database.GreptimeResponse.getDefaultInstance()); // HandleRequests - MethodDescriptor handleRequestsMethod = MethodDescriptor - .of(String.format(DATABASE_METHOD_TEMPLATE, "HandleRequests"), MethodDescriptor.MethodType.CLIENT_STREAMING); + MethodDescriptor handleRequestsMethod = MethodDescriptor.of( + String.format(DATABASE_METHOD_TEMPLATE, "HandleRequests"), + MethodDescriptor.MethodType.CLIENT_STREAMING); RpcFactoryProvider.getRpcFactory() - .register(handleRequestsMethod, + .register( + handleRequestsMethod, Database.GreptimeRequest.class, Database.GreptimeRequest.getDefaultInstance(), Database.GreptimeResponse.getDefaultInstance()); @@ -52,7 +55,8 @@ public static void registerAllService() { MethodDescriptor healthCheckMethod = MethodDescriptor.of( String.format(HEALTH_METHOD_TEMPLATE, "HealthCheck"), MethodDescriptor.MethodType.UNARY); RpcFactoryProvider.getRpcFactory() - .register(healthCheckMethod, + .register( + healthCheckMethod, Health.HealthCheckRequest.class, Health.HealthCheckRequest.getDefaultInstance(), Health.HealthCheckResponse.getDefaultInstance()); diff --git a/ingester-protocol/src/main/java/io/greptime/Util.java b/ingester-protocol/src/main/java/io/greptime/Util.java index ce5a53d..8b6126d 100644 --- a/ingester-protocol/src/main/java/io/greptime/Util.java +++ b/ingester-protocol/src/main/java/io/greptime/Util.java @@ -138,6 +138,20 @@ public static CompletableFuture completedCf(U value) { return CompletableFuture.completedFuture(value); } + /** + * Returns a new CompletableFuture that is already exceptionally with the given + * error. + * + * @param t the given exception + * @param the type of the value + * @return the exceptionally {@link CompletableFuture} + */ + public static CompletableFuture errorCf(Throwable t) { + CompletableFuture err = new CompletableFuture<>(); + err.completeExceptionally(t); + return err; + } + public static Observer toObserver(CompletableFuture future) { return new Observer() { diff --git a/ingester-protocol/src/main/java/io/greptime/WriteClient.java b/ingester-protocol/src/main/java/io/greptime/WriteClient.java index eb23659..246a0aa 100644 --- a/ingester-protocol/src/main/java/io/greptime/WriteClient.java +++ b/ingester-protocol/src/main/java/io/greptime/WriteClient.java @@ -47,6 +47,7 @@ import io.greptime.v1.Common; import io.greptime.v1.Database; import java.util.Collection; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -56,7 +57,7 @@ /** * Default Write API impl. */ -public class WriteClient implements Write, Lifecycle, Display { +public class WriteClient implements Write, Health, Lifecycle, Display { private static final Logger LOG = LoggerFactory.getLogger(WriteClient.class); @@ -254,6 +255,11 @@ public void onCompleted() { }; } + @Override + public CompletableFuture> checkHealth() { + return this.routerClient.checkHealth(); + } + @Override public void display(Printer out) { out.println("--- WriteClient ---") diff --git a/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java index a5b10b8..fda491d 100644 --- a/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java +++ b/ingester-protocol/src/test/java/io/greptime/options/GreptimeOptionsTest.java @@ -88,19 +88,13 @@ public void testAllOptions() { private Router createTestRouter() { return new Router() { - @Override public CompletableFuture routeFor(Void request) { return null; } @Override - public CompletableFuture refresh() { - return null; - } - - @Override - public void onRefresh(List endpoints) {} + public void onRefresh(List activities, List inactivities) {} }; } } From ae029796a9c151ab181407407c9995cca5ca42f3 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Wed, 25 Sep 2024 10:49:26 +0800 Subject: [PATCH 3/8] chore: default options values --- .../java/io/greptime/options/GreptimeOptions.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java index f35d6b9..0d321ca 100644 --- a/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java +++ b/ingester-protocol/src/main/java/io/greptime/options/GreptimeOptions.java @@ -35,6 +35,11 @@ * GreptimeDB client options. */ public class GreptimeOptions implements Copiable { + public static final int DEFAULT_WRITE_MAX_RETRIES = 1; + public static final int DEFAULT_MAX_IN_FLIGHT_WRITE_POINTS = 10 * 65536; + public static final int DEFAULT_DEFAULT_STREAM_MAX_WRITE_POINTS_PER_SECOND = 10 * 65536; + public static final long DEFAULT_ROUTE_TABLE_REFRESH_PERIOD_SECONDS = 10 * 60; + private List endpoints; private RpcOptions rpcOptions; private RouterOptions routerOptions; @@ -145,14 +150,14 @@ public static final class Builder { private RpcOptions rpcOptions = RpcOptions.newDefault(); // GreptimeDB secure connection options private TlsOptions tlsOptions; - private int writeMaxRetries = 1; + private int writeMaxRetries = DEFAULT_WRITE_MAX_RETRIES; // Write flow limit: maximum number of data points in-flight. - private int maxInFlightWritePoints = 10 * 65536; + private int maxInFlightWritePoints = DEFAULT_MAX_IN_FLIGHT_WRITE_POINTS; private LimitedPolicy writeLimitedPolicy = LimitedPolicy.defaultWriteLimitedPolicy(); - private int defaultStreamMaxWritePointsPerSecond = 10 * 65536; + private int defaultStreamMaxWritePointsPerSecond = DEFAULT_DEFAULT_STREAM_MAX_WRITE_POINTS_PER_SECOND; // Refresh frequency of route tables. The background refreshes all route tables periodically. // If the value is less than or equal to 0, the route tables will not be refreshed. - private long routeTableRefreshPeriodSeconds = -1; + private long routeTableRefreshPeriodSeconds = DEFAULT_ROUTE_TABLE_REFRESH_PERIOD_SECONDS; // Authentication information private AuthInfo authInfo; // The request router From 8b498bb19419593c639a53542779b3adda784cf8 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Wed, 25 Sep 2024 11:59:11 +0800 Subject: [PATCH 4/8] feat: async health check task --- .../main/java/io/greptime/RouterClient.java | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/ingester-protocol/src/main/java/io/greptime/RouterClient.java b/ingester-protocol/src/main/java/io/greptime/RouterClient.java index 94eedb6..a569513 100644 --- a/ingester-protocol/src/main/java/io/greptime/RouterClient.java +++ b/ingester-protocol/src/main/java/io/greptime/RouterClient.java @@ -33,6 +33,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -66,14 +67,28 @@ public boolean init(RouterOptions opts) { long refreshPeriod = this.opts.getRefreshPeriodSeconds(); if (refreshPeriod > 0) { + AtomicLong order = new AtomicLong(0); this.refresher = REFRESHER_POOL.getObject(); this.refresher.scheduleWithFixedDelay( () -> { - try { - Map health = this.checkHealth().get(); + long thisOrder = order.incrementAndGet(); + checkHealth().whenComplete((r, t) -> { + if (t != null) { + LOG.warn("Failed to check health", t); + return; + } + + // I don't want to worry about the overflow issue with long anymore, + // because assuming one increment per second, it will take 292 years + // to overflow. I think that's sufficient. + if (thisOrder < order.get()) { + LOG.warn("Skip outdated health check result, order: {}", thisOrder); + return; + } + List activities = new ArrayList<>(); List inactivities = new ArrayList<>(); - for (Map.Entry entry : health.entrySet()) { + for (Map.Entry entry : r.entrySet()) { if (entry.getValue()) { activities.add(entry.getKey()); } else { @@ -81,9 +96,7 @@ public boolean init(RouterOptions opts) { } } this.router.onRefresh(activities, inactivities); - } catch (Throwable t) { - LOG.warn("Failed to check health", t); - } + }); }, Util.randomInitialDelay(180), refreshPeriod, @@ -279,9 +292,7 @@ public CompletableFuture routeFor(Void request) { @Override public void onRefresh(List activities, List inactivities) { - if (inactivities != null && !inactivities.isEmpty()) { - LOG.warn("Some endpoints are inactive: {}", inactivities); - } + LOG.info("Router cache refreshed, activities: {}, inactivities: {}", activities, inactivities); this.endpointsRef.set(new Endpoints(activities, inactivities)); } } From 1b95cb9e6582c4b602a13c999992e40bb2a5a598 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Wed, 25 Sep 2024 14:43:49 +0800 Subject: [PATCH 5/8] fix: atomic refresh --- .../main/java/io/greptime/RouterClient.java | 37 ++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/ingester-protocol/src/main/java/io/greptime/RouterClient.java b/ingester-protocol/src/main/java/io/greptime/RouterClient.java index a569513..9e04ca8 100644 --- a/ingester-protocol/src/main/java/io/greptime/RouterClient.java +++ b/ingester-protocol/src/main/java/io/greptime/RouterClient.java @@ -50,6 +50,8 @@ public class RouterClient implements Lifecycle, Health, Display { private static final SharedScheduledPool REFRESHER_POOL = Util.getSharedScheduledPool("route_cache_refresher", 1); + private final AtomicLong refreshSequencer = new AtomicLong(0); + private ScheduledExecutorService refresher; private RouterOptions opts; private RpcClient rpcClient; @@ -67,35 +69,36 @@ public boolean init(RouterOptions opts) { long refreshPeriod = this.opts.getRefreshPeriodSeconds(); if (refreshPeriod > 0) { - AtomicLong order = new AtomicLong(0); this.refresher = REFRESHER_POOL.getObject(); this.refresher.scheduleWithFixedDelay( () -> { - long thisOrder = order.incrementAndGet(); + long thisSequence = this.refreshSequencer.incrementAndGet(); checkHealth().whenComplete((r, t) -> { if (t != null) { LOG.warn("Failed to check health", t); return; } - // I don't want to worry about the overflow issue with long anymore, - // because assuming one increment per second, it will take 292 years - // to overflow. I think that's sufficient. - if (thisOrder < order.get()) { - LOG.warn("Skip outdated health check result, order: {}", thisOrder); - return; - } + synchronized (this.refreshSequencer) { + // I don't want to worry about the overflow issue with long anymore, + // because assuming one increment per second, it will take 292 years + // to overflow. I think that's sufficient. + if (thisSequence < this.refreshSequencer.get()) { + LOG.warn("Skip outdated health check result, sequence: {}", thisSequence); + return; + } - List activities = new ArrayList<>(); - List inactivities = new ArrayList<>(); - for (Map.Entry entry : r.entrySet()) { - if (entry.getValue()) { - activities.add(entry.getKey()); - } else { - inactivities.add(entry.getKey()); + List activities = new ArrayList<>(); + List inactivities = new ArrayList<>(); + for (Map.Entry entry : r.entrySet()) { + if (entry.getValue()) { + activities.add(entry.getKey()); + } else { + inactivities.add(entry.getKey()); + } } + this.router.onRefresh(activities, inactivities); } - this.router.onRefresh(activities, inactivities); }); }, Util.randomInitialDelay(180), From b45001e403bbf43c6a78dbdff6862a53d17581ea Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Wed, 25 Sep 2024 15:08:15 +0800 Subject: [PATCH 6/8] chore: add comment --- ingester-protocol/src/main/java/io/greptime/RouterClient.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ingester-protocol/src/main/java/io/greptime/RouterClient.java b/ingester-protocol/src/main/java/io/greptime/RouterClient.java index 9e04ca8..f39736e 100644 --- a/ingester-protocol/src/main/java/io/greptime/RouterClient.java +++ b/ingester-protocol/src/main/java/io/greptime/RouterClient.java @@ -80,6 +80,8 @@ public boolean init(RouterOptions opts) { } synchronized (this.refreshSequencer) { + // If the next task has started, we will ignore the current result. + // // I don't want to worry about the overflow issue with long anymore, // because assuming one increment per second, it will take 292 years // to overflow. I think that's sufficient. From c1673893ae7cbebd912519e09c10729918385625 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Wed, 25 Sep 2024 16:11:48 +0800 Subject: [PATCH 7/8] chore: lock this object --- ingester-protocol/src/main/java/io/greptime/RouterClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingester-protocol/src/main/java/io/greptime/RouterClient.java b/ingester-protocol/src/main/java/io/greptime/RouterClient.java index f39736e..b1532f5 100644 --- a/ingester-protocol/src/main/java/io/greptime/RouterClient.java +++ b/ingester-protocol/src/main/java/io/greptime/RouterClient.java @@ -79,7 +79,7 @@ public boolean init(RouterOptions opts) { return; } - synchronized (this.refreshSequencer) { + synchronized (this) { // If the next task has started, we will ignore the current result. // // I don't want to worry about the overflow issue with long anymore, From 34c75f8cd94b474414edc3a5880b55746dd9312e Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Thu, 26 Sep 2024 08:22:07 +0800 Subject: [PATCH 8/8] chore: lock a clearly object --- ingester-protocol/src/main/java/io/greptime/RouterClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingester-protocol/src/main/java/io/greptime/RouterClient.java b/ingester-protocol/src/main/java/io/greptime/RouterClient.java index b1532f5..f39736e 100644 --- a/ingester-protocol/src/main/java/io/greptime/RouterClient.java +++ b/ingester-protocol/src/main/java/io/greptime/RouterClient.java @@ -79,7 +79,7 @@ public boolean init(RouterOptions opts) { return; } - synchronized (this) { + synchronized (this.refreshSequencer) { // If the next task has started, we will ignore the current result. // // I don't want to worry about the overflow issue with long anymore,