From 9ee93297417cf5d0f9290730af5d1780f7cfdf32 Mon Sep 17 00:00:00 2001 From: Li Date: Thu, 5 Mar 2020 15:23:46 -0800 Subject: [PATCH 1/2] add cluster master health check monitoring --- .../ha/clustermonitor/HealthChecker.java | 42 ++++++------------- .../ha/module/HaGatewayProviderModule.java | 12 +++++- .../dao/GatewayBackendStateHistory.java | 39 +++++++++++++++++ .../ha/resource/EntityEditorResource.java | 7 +++- .../ha/router/GatewayBackendStateManager.java | 24 +++++++++++ .../router/HaGatewayBackendStateManager.java | 41 ++++++++++++++++++ 6 files changed, 134 insertions(+), 31 deletions(-) create mode 100644 gateway-ha/src/main/java/com/lyft/data/gateway/ha/persistence/dao/GatewayBackendStateHistory.java create mode 100644 gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/GatewayBackendStateManager.java create mode 100644 gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/HaGatewayBackendStateManager.java diff --git a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/HealthChecker.java b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/HealthChecker.java index 219ae6bd..1987ba4b 100644 --- a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/HealthChecker.java +++ b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/HealthChecker.java @@ -1,11 +1,17 @@ package com.lyft.data.gateway.ha.clustermonitor; +import com.lyft.data.gateway.ha.config.DataStoreConfiguration; import com.lyft.data.gateway.ha.notifier.Notifier; +import com.lyft.data.gateway.ha.persistence.JdbcConnectionManager; +import com.lyft.data.gateway.ha.router.GatewayBackendStateManager.GatewayBackendState; +import com.lyft.data.gateway.ha.router.HaGatewayBackendStateManager; import java.util.List; + public class HealthChecker implements PrestoClusterStatsObserver { private static final int MAX_THRESHOLD_QUEUED_QUERY_COUNT = 100; private Notifier notifier; + private HaGatewayBackendStateManager haGatewayBackendStateManager; public HealthChecker(Notifier notifier) { this.notifier = notifier; @@ -13,35 +19,13 @@ public HealthChecker(Notifier notifier) { @Override public void observe(List clustersStats) { - for (ClusterStats clusterStats : clustersStats) { - if (!clusterStats.isHealthy()) { - notifyUnhealthyCluster(clusterStats); - } else { - if (clusterStats.getQueuedQueryCount() > MAX_THRESHOLD_QUEUED_QUERY_COUNT) { - notifyForTooManyQueuedQueries(clusterStats); - } - if (clusterStats.getNumWorkerNodes() < 1) { - notifyForNoWorkers(clusterStats); - } - } + for (ClusterStats clusterStat : clustersStats) { + // TODO: Store the stats in a persistent layer + GatewayBackendState backend = new GatewayBackendState(); + backend.setName(clusterStat.getClusterId()); + backend.setHealth(clusterStat.getHealthy()); + backend.setWorkerCount(clusterStat.getNumWorkerNodes()); + GatewayBackendState updated = haGatewayBackendStateManager.addBackend(backend); } } - - private void notifyUnhealthyCluster(ClusterStats clusterStats) { - notifier.sendNotification(String.format("%s - Cluster unhealthy", - clusterStats.getClusterId()), - clusterStats.toString()); - } - - private void notifyForTooManyQueuedQueries(ClusterStats clusterStats) { - notifier.sendNotification(String.format("%s - Too many queued queries", - clusterStats.getClusterId()), clusterStats.toString()); - } - - private void notifyForNoWorkers(ClusterStats clusterStats) { - notifier.sendNotification(String.format("%s - Number of workers", - clusterStats.getClusterId()), clusterStats.toString()); - } - - } diff --git a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/module/HaGatewayProviderModule.java b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/module/HaGatewayProviderModule.java index 893111bf..a54b636e 100644 --- a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/module/HaGatewayProviderModule.java +++ b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/module/HaGatewayProviderModule.java @@ -9,6 +9,8 @@ import com.lyft.data.gateway.ha.handler.QueryIdCachingProxyHandler; import com.lyft.data.gateway.ha.persistence.JdbcConnectionManager; import com.lyft.data.gateway.ha.router.GatewayBackendManager; +import com.lyft.data.gateway.ha.router.GatewayBackendStateManager; +import com.lyft.data.gateway.ha.router.HaGatewayBackendStateManager; import com.lyft.data.gateway.ha.router.HaGatewayManager; import com.lyft.data.gateway.ha.router.HaQueryHistoryManager; import com.lyft.data.gateway.ha.router.HaRoutingManager; @@ -25,6 +27,7 @@ public class HaGatewayProviderModule extends AppModule upcast(List + gatewayBackendStateHistoryList) { + List gatewayBackendStates = new ArrayList<>(); + for (GatewayBackendStateHistory dao : gatewayBackendStateHistoryList) { + GatewayBackendState gatewayBackendState = new GatewayBackendState(); + gatewayBackendState.setName(dao.getString(name)); + gatewayBackendState.setHealth(dao.getBoolean(health)); + gatewayBackendState.setWorkerCount(dao.getInteger(workerCount)); + } + return gatewayBackendStates; + } + + public static void create(GatewayBackendStateHistory model, + GatewayBackendState gatewayBackendState) { + model.set(name, gatewayBackendState.getName()); + model.set(health, gatewayBackendState.getHealth()); + model.set(workerCount, gatewayBackendState.getWorkerCount()); + model.insert(); + } +} diff --git a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/resource/EntityEditorResource.java b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/resource/EntityEditorResource.java index bc4c0129..58907ecf 100644 --- a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/resource/EntityEditorResource.java +++ b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/resource/EntityEditorResource.java @@ -6,6 +6,7 @@ import com.google.inject.Inject; import com.lyft.data.gateway.ha.config.ProxyBackendConfiguration; import com.lyft.data.gateway.ha.router.GatewayBackendManager; +import com.lyft.data.gateway.ha.router.GatewayBackendStateManager; import io.dropwizard.views.View; import java.io.IOException; @@ -31,6 +32,7 @@ public class EntityEditorResource { @Inject private GatewayBackendManager gatewayBackendManager; + @Inject private GatewayBackendStateManager gatewayBackendStateManager; public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @GET @@ -46,7 +48,8 @@ protected EntityView(String templateName) { } private enum EntityType { - GATEWAY_BACKEND + GATEWAY_BACKEND, + GATEWAY_BACKEND_STATE } @GET @@ -87,6 +90,8 @@ public Response getAllEntitiesForType(@PathParam("entityType") String entityType switch (entityType) { case GATEWAY_BACKEND: return Response.ok(gatewayBackendManager.getAllBackends()).build(); + case GATEWAY_BACKEND_STATE: + return Response.ok(gatewayBackendStateManager.getAllBackendStates()).build(); default: } return Response.ok(ImmutableList.of()).build(); diff --git a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/GatewayBackendStateManager.java b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/GatewayBackendStateManager.java new file mode 100644 index 00000000..4af9e3ea --- /dev/null +++ b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/GatewayBackendStateManager.java @@ -0,0 +1,24 @@ +package com.lyft.data.gateway.ha.router; + +//import com.lyft.data.gateway.ha.persistence.dao.GatewayBackendStateHistory; +import java.util.List; +import lombok.Data; +import lombok.ToString; + +public interface GatewayBackendStateManager { + + List getAllBackendStates(); + + GatewayBackendState addBackend(GatewayBackendState backend); + + @Data + @ToString + class GatewayBackendState { + + private String name; + private Boolean health; + private int workerCount; + + } + +} diff --git a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/HaGatewayBackendStateManager.java b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/HaGatewayBackendStateManager.java new file mode 100644 index 00000000..6a6bd621 --- /dev/null +++ b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/HaGatewayBackendStateManager.java @@ -0,0 +1,41 @@ +package com.lyft.data.gateway.ha.router; + +import com.lyft.data.gateway.ha.persistence.JdbcConnectionManager; +import com.lyft.data.gateway.ha.persistence.dao.GatewayBackendStateHistory; + +import java.util.List; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class HaGatewayBackendStateManager implements GatewayBackendStateManager { + + private JdbcConnectionManager connectionManager; + + public HaGatewayBackendStateManager(JdbcConnectionManager connectionManager) { + this.connectionManager = connectionManager; + } + + @Override + public List getAllBackendStates() { + //TODO implement method to obtain all cluster states + try { + connectionManager.open(); + List proxyBackendList = GatewayBackendStateHistory.findAll(); + return GatewayBackendStateHistory.upcast(proxyBackendList); + } finally { + connectionManager.close(); + } + } + + @Override + public GatewayBackendState addBackend(GatewayBackendState backend) { + try { + connectionManager.open(); + GatewayBackendStateHistory.create(new GatewayBackendStateHistory(), backend); + } finally { + connectionManager.close(); + } + + return backend; + } +} From 858b6bfe23992e5dd4aeecfdfc6aadab7debcd1d Mon Sep 17 00:00:00 2001 From: Li Date: Thu, 5 Mar 2020 16:57:03 -0800 Subject: [PATCH 2/2] add previous --- .../ha/clustermonitor/HealthChecker.java | 34 ++++++++++++++++--- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/HealthChecker.java b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/HealthChecker.java index 1987ba4b..ef5e8ac2 100644 --- a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/HealthChecker.java +++ b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/HealthChecker.java @@ -19,13 +19,37 @@ public HealthChecker(Notifier notifier) { @Override public void observe(List clustersStats) { - for (ClusterStats clusterStat : clustersStats) { - // TODO: Store the stats in a persistent layer + for (ClusterStats clusterStats : clustersStats) { + if (!clusterStats.isHealthy()) { + notifyUnhealthyCluster(clusterStats); + } else { + if (clusterStats.getQueuedQueryCount() > MAX_THRESHOLD_QUEUED_QUERY_COUNT) { + notifyForTooManyQueuedQueries(clusterStats); + } + if (clusterStats.getNumWorkerNodes() < 1) { + notifyForNoWorkers(clusterStats); + } + } GatewayBackendState backend = new GatewayBackendState(); - backend.setName(clusterStat.getClusterId()); - backend.setHealth(clusterStat.getHealthy()); - backend.setWorkerCount(clusterStat.getNumWorkerNodes()); + backend.setName(clusterStats.getClusterId()); + backend.setHealth(clusterStats.isHealthy()); + backend.setWorkerCount(clusterStats.getNumWorkerNodes()); GatewayBackendState updated = haGatewayBackendStateManager.addBackend(backend); } } + private void notifyUnhealthyCluster(ClusterStats clusterStats) { + notifier.sendNotification(String.format("%s - Cluster unhealthy", + clusterStats.getClusterId()), + clusterStats.toString()); + } + + private void notifyForTooManyQueuedQueries(ClusterStats clusterStats) { + notifier.sendNotification(String.format("%s - Too many queued queries", + clusterStats.getClusterId()), clusterStats.toString()); + } + + private void notifyForNoWorkers(ClusterStats clusterStats) { + notifier.sendNotification(String.format("%s - Number of workers", + clusterStats.getClusterId()), clusterStats.toString()); + } }