Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/issue615 provide circuit breaker metrics #623

Merged
merged 3 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,14 @@ public void releaseLock(Lock lockImpl, String lock, String token, Logger log){
}
});
}

/**
* Calculate the lock expiry time. This is a simple helper to work with the lock expiry time.
*
* @param taskInterval the interval of the task
* @return the calculated lock expiry time
*/
public static long calcLockExpiry(long taskInterval) {
return taskInterval <= 1 ? 1 : taskInterval / 2;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ public void setUp(){
lockUtil = new LockUtil(newGateleenWastefulExceptionFactory());
}

@Test
public void testCalculateLockExpiry(TestContext context) {
context.assertEquals(1L, LockUtil.calcLockExpiry(1));
context.assertEquals(1L, LockUtil.calcLockExpiry(0));
context.assertEquals(1L, LockUtil.calcLockExpiry(-20));
context.assertEquals(1L, LockUtil.calcLockExpiry(2));
context.assertEquals(1L, LockUtil.calcLockExpiry(3));
context.assertEquals(2L, LockUtil.calcLockExpiry(4));
context.assertEquals(4L, LockUtil.calcLockExpiry(8));
context.assertEquals(32L, LockUtil.calcLockExpiry(64));
context.assertEquals(750L, LockUtil.calcLockExpiry(1500));
context.assertEquals(5000L, LockUtil.calcLockExpiry(10001));
}

@Test
public void testAcquireLockWithoutLockImplementationDefined(TestContext context) {
Async async = context.async();
Expand Down
15 changes: 15 additions & 0 deletions gateleen-playground/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,21 @@
<artifactId>gateleen-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-micrometer-metrics</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>${micrometer.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<version>${micrometer.version}</version>
</dependency>
<dependency>
<groupId>org.swisspush</groupId>
<artifactId>redisques</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ public Future<Long> unlockSampleQueues() {
failedFutures.add(event1.cause().getMessage());
}
if (futureCounter.get() == 0) {
if (failedFutures.size() > 0) {
if (!failedFutures.isEmpty()) {
promise.fail("The following queues could not be unlocked: " + failedFutures);
} else {
promise.complete((long) queuesToUnlock.size());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.swisspush.gateleen.queue.queuing.circuitbreaker.impl;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
Expand Down Expand Up @@ -38,6 +37,7 @@ public class RedisQueueCircuitBreakerStorage implements QueueCircuitBreakerStora
public static final String STORAGE_OPEN_CIRCUITS = STORAGE_PREFIX + "open-circuits";
public static final String STORAGE_QUEUES_TO_UNLOCK = STORAGE_PREFIX + "queues-to-unlock";
public static final String FIELD_STATE = "state";
public static final String FIELD_STATUS = "status";
public static final String FIELD_FAILRATIO = "failRatio";
public static final String FIELD_CIRCUIT = "circuit";
public static final String FIELD_METRICNAME = "metricName";
Expand Down Expand Up @@ -111,7 +111,7 @@ public Future<JsonObject> getQueueCircuitInformation(String circuitHash) {
String circuit = Objects.toString(event.result().get(2), null);
String metric = Objects.toString(event.result().get(3), null);
JsonObject result = new JsonObject();
result.put("status", state.name().toLowerCase());
result.put(FIELD_STATUS, state.name().toLowerCase());
JsonObject info = new JsonObject();
if (failRatioStr != null) {
info.put(FIELD_FAILRATIO, Integer.valueOf(failRatioStr));
Expand Down Expand Up @@ -246,7 +246,7 @@ public Future<Void> closeAllCircuits() {
Future<Void> closeOpenCircuitsFuture = closeCircuitsByKey(STORAGE_OPEN_CIRCUITS);
Future<Void> closeHalfOpenCircuitsFuture = closeCircuitsByKey(STORAGE_HALFOPEN_CIRCUITS);

CompositeFuture.all(closeOpenCircuitsFuture, closeHalfOpenCircuitsFuture).onComplete(event -> {
Future.all(closeOpenCircuitsFuture, closeHalfOpenCircuitsFuture).onComplete(event -> {
if (event.succeeded()) {
promise.complete();
} else {
Expand All @@ -261,14 +261,14 @@ private Future<Void> closeCircuitsByKey(String key) {
Promise<Void> promise = Promise.promise();
redisProvider.redis().onSuccess(redisAPI -> redisAPI.smembers(key, event -> {
if (event.succeeded()) {
List<Future> promises = new ArrayList<>();
List<Future<Void>> promises = new ArrayList<>();
for (Response circuit : event.result()) {
promises.add(closeCircuit(circuit.toString(), false));
}
if (promises.size() == 0) {
if (promises.isEmpty()) {
promise.complete();
} else {
CompositeFuture.all(promises).onComplete(event1 -> {
Future.all(promises).onComplete(event1 -> {
if (event1.succeeded()) {
promise.complete();
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package org.swisspush.gateleen.queue.queuing.circuitbreaker.monitoring;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.*;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.exception.GateleenExceptionFactory;
import org.swisspush.gateleen.core.lock.Lock;
import org.swisspush.gateleen.core.util.Address;
import org.swisspush.gateleen.core.util.LockUtil;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreakerStorage;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueCircuitState;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import static org.swisspush.gateleen.core.util.LockUtil.acquireLock;
import static org.swisspush.gateleen.core.util.LockUtil.calcLockExpiry;
import static org.swisspush.gateleen.queue.queuing.circuitbreaker.impl.RedisQueueCircuitBreakerStorage.*;

/**
* Class responsible for collecting metrics for the Queue Circuit Breaker.
*
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
public class QueueCircuitBreakerMetricsCollector {

private final Logger log = LoggerFactory.getLogger(QueueCircuitBreakerMetricsCollector.class);

private final Lock lock;
private final LockUtil lockUtil;

public static final String COLLECT_METRICS_TASK_LOCK = "collectCircuitBreakerMetrics";
public static final String CIRCUIT_BREAKER_STATUS_METRIC = "gateleen.circuitbreaker.status";
public static final String CIRCUIT_BREAKER_FAILRATIO_METRIC = "gateleen.circuitbreaker.failratio";

private final QueueCircuitBreakerStorage queueCircuitBreakerStorage;
private final MeterRegistry meterRegistry;
private final long metricCollectionIntervalMs;

private final Map<String, AtomicInteger> circuitStateMap = new HashMap<>();
private final Map<String, AtomicInteger> circuitFailRatioMap = new HashMap<>();

/**
* Constructor for QueueCircuitBreakerMetricsCollector.
*
* @param vertx Vertx instance
* @param lock Lock instance
* @param queueCircuitBreakerStorage Storage for circuit breaker data
* @param meterRegistry Meter registry for metrics
* @param exceptionFactory Exception factory
* @param metricCollectionIntervalSeconds Interval for metric collection in seconds
*/
public QueueCircuitBreakerMetricsCollector(Vertx vertx, Lock lock, QueueCircuitBreakerStorage queueCircuitBreakerStorage,
MeterRegistry meterRegistry, GateleenExceptionFactory exceptionFactory,
long metricCollectionIntervalSeconds) {
this.lock = lock;
this.lockUtil = new LockUtil(exceptionFactory);
this.queueCircuitBreakerStorage = queueCircuitBreakerStorage;
this.meterRegistry = meterRegistry;

this.metricCollectionIntervalMs = metricCollectionIntervalSeconds * 1000;

vertx.setPeriodic(metricCollectionIntervalMs, event -> collectMetrics()
.onFailure(event1 -> log.error("Could not collect metrics. Message: {}", event1.getMessage())));
}

/**
* Collects metrics for the Queue Circuit Breaker.
*
* @return Future representing the completion of the metric collection
*/
public Future<Void> collectMetrics() {
log.debug("Collecting metrics");
Promise<Void> promise = Promise.promise();
final String token = createToken();
acquireLock(lock, COLLECT_METRICS_TASK_LOCK, token, calcLockExpiry(metricCollectionIntervalMs), log).onComplete(lockEvent -> {
if (lockEvent.succeeded()) {
if (lockEvent.result()) {
handleMetricsCollection(token).onComplete(event -> {
if (event.succeeded()) {
promise.complete();
} else {
promise.fail(event.cause());
}
});
} else {
promise.complete();
}
} else {
log.error("Could not acquire lock '{}'. Message: {}", COLLECT_METRICS_TASK_LOCK, lockEvent.cause().getMessage());
promise.fail(lockEvent.cause().getMessage());
}
});
return promise.future();
}

private Future<Void> handleMetricsCollection(String token) {
return queueCircuitBreakerStorage.getAllCircuits().compose((Function<JsonObject, Future<Void>>) entries -> {
extractMetricsFromCircuitsObject(entries);
return Future.succeededFuture();
}).andThen(event -> lockUtil.releaseLock(lock, COLLECT_METRICS_TASK_LOCK, token, log));
}

private void extractMetricsFromCircuitsObject(JsonObject circuits) {
circuits.stream().forEach(entry -> {
String circuitName = entry.getKey();
JsonObject circuitValue = (JsonObject) entry.getValue();
QueueCircuitState queueCircuitState = QueueCircuitState.fromString(circuitValue.getString(FIELD_STATUS), null);
if (queueCircuitState == null) {
log.warn("No status found for circuit '{}'", circuitName);
return;
}

JsonObject infos = circuitValue.getJsonObject("infos");
if (infos != null) {
String metric = infos.getString(FIELD_METRICNAME);
Integer failRatio = infos.getInteger(FIELD_FAILRATIO);
if (metric != null && failRatio != null) {
publishMetric(metric, queueCircuitState, failRatio);
}
}
});
}

private void publishMetric(String metricName, QueueCircuitState queueCircuitState, int failRatio) {
Integer stateValue = circuitStateToValue(queueCircuitState);
if(stateValue != null) {
getCircuitStateMeter(metricName).set(stateValue);
}
getCircuitFailRatioMeter(metricName).set(failRatio);
}

private String createToken() {
return Address.instanceAddress() + "_" + System.currentTimeMillis() + "_" + COLLECT_METRICS_TASK_LOCK;
}

private AtomicInteger getCircuitStateMeter(String metricName) {
return circuitStateMap.computeIfAbsent(metricName, key -> {
AtomicInteger newMeterValue = new AtomicInteger();
Gauge.builder(CIRCUIT_BREAKER_STATUS_METRIC, newMeterValue, AtomicInteger::get)
.description("Status of the circuit, 0=CLOSED, 1=HALF_OPEN, 2=OPEN")
.tag("metricName", metricName)
.register(meterRegistry);
return newMeterValue;
});
}

private AtomicInteger getCircuitFailRatioMeter(String metricName) {
return circuitFailRatioMap.computeIfAbsent(metricName, key -> {
AtomicInteger newMeterValue = new AtomicInteger();
Gauge.builder(CIRCUIT_BREAKER_FAILRATIO_METRIC, newMeterValue, AtomicInteger::get)
.description("Fail ratio of the circuit in percentage")
.tag("metricName", metricName)
.register(meterRegistry);
return newMeterValue;
});
}

private Integer circuitStateToValue(QueueCircuitState queueCircuitState) {
switch (queueCircuitState) {
case CLOSED:
return 0;
case HALF_OPEN:
return 1;
case OPEN:
return 2;
default:
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
int result = pattern.hashCode();
result = 31 * result + circuitHash.hashCode();
return result;
return Objects.hash(pattern, circuitHash);
}
}
Loading
Loading