Skip to content

Commit

Permalink
feat: health check in RouteClient
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Sep 23, 2024
1 parent 680ce3c commit f6e84a1
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 50 deletions.
4 changes: 2 additions & 2 deletions ingester-protocol/src/main/java/io/greptime/GreptimeDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
/**
* The GreptimeDB client.
*/
public class GreptimeDB implements Write, WriteObject, Lifecycle<GreptimeOptions>, HealthCheck, Display {
public class GreptimeDB implements Write, WriteObject, Lifecycle<GreptimeOptions>, Health, Display {

private static final Logger LOG = LoggerFactory.getLogger(GreptimeDB.class);

Expand Down Expand Up @@ -178,7 +178,7 @@ public StreamWriter<Table, WriteOk> streamWriter(int maxPointsPerSecond, Context
}

@Override
public CompletableFuture<Boolean> is_alive() {
public CompletableFuture<Map<Endpoint, Boolean>> checkHealth() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package io.greptime;

import io.greptime.common.Endpoint;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Health check. It just like to probe the database and connections.
* Users can use this status to perform fault tolerance and disaster recovery actions.
*
* @author jiachun.fjc
*/
public interface HealthCheck {
CompletableFuture<Boolean> is_alive();
public interface Health {
CompletableFuture<Map<Endpoint, Boolean>> checkHealth();
}
15 changes: 4 additions & 11 deletions ingester-protocol/src/main/java/io/greptime/Router.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,10 @@ public interface Router<R, E> {
CompletableFuture<E> routeFor(R request);

/**
* Refresh the routing table from remote server.
* @return a future that will be completed when the refresh is done
*/
CompletableFuture<Boolean> refresh();

/**
* Refresh the routing table.
* We need to get all the endpoints, and this method will overwrite all
* current endpoints.
* Refresh the routing table. By health checker or service discovery.
*
* @param endpoints all new endpoints
* @param activities all activities endpoints
* @param inactivities all inactivities endpoints
*/
void onRefresh(List<E> endpoints);
void onRefresh(List<E> activities, List<E> inactivities);
}
91 changes: 73 additions & 18 deletions ingester-protocol/src/main/java/io/greptime/RouterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,25 @@
import io.greptime.rpc.Context;
import io.greptime.rpc.Observer;
import io.greptime.rpc.RpcClient;
import io.greptime.v1.Health.HealthCheckRequest;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A route rpc client which cached the routing table information locally
* and will auto refresh.
*/
public class RouterClient implements Lifecycle<RouterOptions>, Display {
public class RouterClient implements Lifecycle<RouterOptions>, Health, Display {

private static final Logger LOG = LoggerFactory.getLogger(RouterClient.class);

Expand All @@ -57,19 +62,29 @@ public boolean init(RouterOptions opts) {
List<Endpoint> endpoints = Ensures.ensureNonNull(this.opts.getEndpoints(), "null `endpoints`");

this.router = new DefaultRouter();
this.router.onRefresh(endpoints);
this.router.onRefresh(endpoints, null);

long refreshPeriod = this.opts.getRefreshPeriodSeconds();
if (refreshPeriod > 0) {
this.refresher = REFRESHER_POOL.getObject();
this.refresher.scheduleWithFixedDelay(
() -> this.router.refresh().whenComplete((r, e) -> {
if (e != null) {
LOG.error("Router cache refresh failed.", e);
} else {
LOG.debug("Router cache refresh {}.", r ? "success" : "failed");
() -> {
try {
Map<Endpoint, Boolean> health = this.checkHealth().get();
List<Endpoint> activities = new ArrayList<>();
List<Endpoint> inactivities = new ArrayList<>();
for (Map.Entry<Endpoint, Boolean> entry : health.entrySet()) {
if (entry.getValue()) {
activities.add(entry.getKey());
} else {
inactivities.add(entry.getKey());
}
}
this.router.onRefresh(activities, inactivities);
} catch (Throwable t) {
LOG.warn("Failed to check health", t);
}
}),
},
Util.randomInitialDelay(180),
refreshPeriod,
TimeUnit.SECONDS);
Expand Down Expand Up @@ -204,6 +219,22 @@ public String toString() {
return "RouterClient{" + "refresher=" + refresher + ", opts=" + opts + ", rpcClient=" + rpcClient + '}';
}

@Override
public CompletableFuture<Map<Endpoint, Boolean>> checkHealth() {
Map<Endpoint, CompletableFuture<Boolean>> futures = this.opts.getEndpoints().stream()
.collect(Collectors.toMap(Function.identity(), endpoint -> {
HealthCheckRequest req = HealthCheckRequest.newBuilder().build();
return this.invoke(endpoint, req, Context.newDefault())
.thenApply(resp -> true)
.exceptionally(t -> false); // Handle failure and return false
}));

return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0]))
.thenApply(
ok -> futures.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()
.join())));
}

/**
* Request to a `frontend` server, which needs to return all members(frontend server),
* or it can return only one domain address, it is also possible to return no address
Expand All @@ -219,25 +250,49 @@ public String toString() {
*/
private static class DefaultRouter implements Router<Void, Endpoint> {

private final AtomicReference<List<Endpoint>> endpointsRef = new AtomicReference<>();
private final AtomicReference<Endpoints> endpointsRef = new AtomicReference<>();

@Override
public CompletableFuture<Endpoint> routeFor(Void request) {
List<Endpoint> endpoints = this.endpointsRef.get();
Endpoints endpoints = this.endpointsRef.get();

if (endpoints == null) {
return Util.errorCf(new IllegalStateException("null `endpoints`"));
}

ThreadLocalRandom random = ThreadLocalRandom.current();
int i = random.nextInt(0, endpoints.size());
return Util.completedCf(endpoints.get(i));

if (!endpoints.activities.isEmpty()) {
int i = random.nextInt(0, endpoints.activities.size());
return Util.completedCf(endpoints.activities.get(i));
}

if (!endpoints.inactivities.isEmpty()) {
int i = random.nextInt(0, endpoints.inactivities.size());
Endpoint goodLuck = endpoints.inactivities.get(i);
LOG.warn("No active endpoint, return an inactive one: {}", goodLuck);
return Util.completedCf(goodLuck);
}

return Util.errorCf(new IllegalStateException("empty `endpoints`"));
}

@Override
public CompletableFuture<Boolean> refresh() {
// always return true
return Util.completedCf(true);
public void onRefresh(List<Endpoint> activities, List<Endpoint> inactivities) {
if (inactivities != null && !inactivities.isEmpty()) {
LOG.warn("Some endpoints are inactive: {}", inactivities);
}
this.endpointsRef.set(new Endpoints(activities, inactivities));
}
}

@Override
public void onRefresh(List<Endpoint> endpoints) {
this.endpointsRef.set(endpoints);
static class Endpoints {
final List<Endpoint> activities;
final List<Endpoint> inactivities;

Endpoints(List<Endpoint> activities, List<Endpoint> inactivities) {
this.activities = activities == null ? new ArrayList<>() : activities;
this.inactivities = inactivities == null ? new ArrayList<>() : inactivities;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,22 @@ public class RpcServiceRegister {

public static void registerAllService() {
// Handle
MethodDescriptor handleMethod = MethodDescriptor
.of(String.format(DATABASE_METHOD_TEMPLATE, "Handle"), MethodDescriptor.MethodType.UNARY, 1);
MethodDescriptor handleMethod = MethodDescriptor.of(
String.format(DATABASE_METHOD_TEMPLATE, "Handle"), MethodDescriptor.MethodType.UNARY, 1);
RpcFactoryProvider.getRpcFactory()
.register(handleMethod,
.register(
handleMethod,
Database.GreptimeRequest.class,
Database.GreptimeRequest.getDefaultInstance(),
Database.GreptimeResponse.getDefaultInstance());

// HandleRequests
MethodDescriptor handleRequestsMethod = MethodDescriptor
.of(String.format(DATABASE_METHOD_TEMPLATE, "HandleRequests"), MethodDescriptor.MethodType.CLIENT_STREAMING);
MethodDescriptor handleRequestsMethod = MethodDescriptor.of(
String.format(DATABASE_METHOD_TEMPLATE, "HandleRequests"),
MethodDescriptor.MethodType.CLIENT_STREAMING);
RpcFactoryProvider.getRpcFactory()
.register(handleRequestsMethod,
.register(
handleRequestsMethod,
Database.GreptimeRequest.class,
Database.GreptimeRequest.getDefaultInstance(),
Database.GreptimeResponse.getDefaultInstance());
Expand All @@ -52,7 +55,8 @@ public static void registerAllService() {
MethodDescriptor healthCheckMethod = MethodDescriptor.of(
String.format(HEALTH_METHOD_TEMPLATE, "HealthCheck"), MethodDescriptor.MethodType.UNARY);
RpcFactoryProvider.getRpcFactory()
.register(healthCheckMethod,
.register(
healthCheckMethod,
Health.HealthCheckRequest.class,
Health.HealthCheckRequest.getDefaultInstance(),
Health.HealthCheckResponse.getDefaultInstance());
Expand Down
14 changes: 14 additions & 0 deletions ingester-protocol/src/main/java/io/greptime/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,20 @@ public static <U> CompletableFuture<U> completedCf(U value) {
return CompletableFuture.completedFuture(value);
}

/**
* Returns a new CompletableFuture that is already exceptionally with the given
* error.
*
* @param t the given exception
* @param <U> the type of the value
* @return the exceptionally {@link CompletableFuture}
*/
public static <U> CompletableFuture<U> errorCf(Throwable t) {
CompletableFuture<U> err = new CompletableFuture<>();
err.completeExceptionally(t);
return err;
}

public static <V> Observer<V> toObserver(CompletableFuture<V> future) {
return new Observer<V>() {

Expand Down
8 changes: 7 additions & 1 deletion ingester-protocol/src/main/java/io/greptime/WriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.greptime.v1.Common;
import io.greptime.v1.Database;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
Expand All @@ -56,7 +57,7 @@
/**
* Default Write API impl.
*/
public class WriteClient implements Write, Lifecycle<WriteOptions>, Display {
public class WriteClient implements Write, Health, Lifecycle<WriteOptions>, Display {

private static final Logger LOG = LoggerFactory.getLogger(WriteClient.class);

Expand Down Expand Up @@ -254,6 +255,11 @@ public void onCompleted() {
};
}

@Override
public CompletableFuture<Map<Endpoint, Boolean>> checkHealth() {
return this.routerClient.checkHealth();
}

@Override
public void display(Printer out) {
out.println("--- WriteClient ---")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,13 @@ public void testAllOptions() {

private Router<Void, Endpoint> createTestRouter() {
return new Router<Void, Endpoint>() {

@Override
public CompletableFuture<Endpoint> routeFor(Void request) {
return null;
}

@Override
public CompletableFuture<Boolean> refresh() {
return null;
}

@Override
public void onRefresh(List<Endpoint> endpoints) {}
public void onRefresh(List<Endpoint> activities, List<Endpoint> inactivities) {}
};
}
}

0 comments on commit f6e84a1

Please sign in to comment.