Skip to content

Commit

Permalink
#615 collect circuit breaker metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
mcweba committed Dec 23, 2024
1 parent 6435599 commit fac9614
Show file tree
Hide file tree
Showing 6 changed files with 421 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,14 @@ public void releaseLock(Lock lockImpl, String lock, String token, Logger log){
}
});
}

/**
* Calculate the lock expiry time. This is a simple helper to work with the lock expiry time.
*
* @param taskInterval the interval of the task
* @return the calculated lock expiry time
*/
public static long calcLockExpiry(long taskInterval) {
return taskInterval <= 1 ? 1 : taskInterval / 2;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ public void setUp(){
lockUtil = new LockUtil(newGateleenWastefulExceptionFactory());
}

@Test
public void testCalculateLockExpiry(TestContext context) {
context.assertEquals(1L, LockUtil.calcLockExpiry(1));
context.assertEquals(1L, LockUtil.calcLockExpiry(0));
context.assertEquals(1L, LockUtil.calcLockExpiry(-20));
context.assertEquals(1L, LockUtil.calcLockExpiry(2));
context.assertEquals(1L, LockUtil.calcLockExpiry(3));
context.assertEquals(2L, LockUtil.calcLockExpiry(4));
context.assertEquals(4L, LockUtil.calcLockExpiry(8));
context.assertEquals(32L, LockUtil.calcLockExpiry(64));
context.assertEquals(750L, LockUtil.calcLockExpiry(1500));
context.assertEquals(5000L, LockUtil.calcLockExpiry(10001));
}

@Test
public void testAcquireLockWithoutLockImplementationDefined(TestContext context) {
Async async = context.async();
Expand Down
15 changes: 15 additions & 0 deletions gateleen-playground/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,21 @@
<artifactId>gateleen-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-micrometer-metrics</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>${micrometer.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<version>${micrometer.version}</version>
</dependency>
<dependency>
<groupId>org.swisspush</groupId>
<artifactId>redisques</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class RedisQueueCircuitBreakerStorage implements QueueCircuitBreakerStora
public static final String STORAGE_OPEN_CIRCUITS = STORAGE_PREFIX + "open-circuits";
public static final String STORAGE_QUEUES_TO_UNLOCK = STORAGE_PREFIX + "queues-to-unlock";
public static final String FIELD_STATE = "state";
public static final String FIELD_STATUS = "status";
public static final String FIELD_FAILRATIO = "failRatio";
public static final String FIELD_CIRCUIT = "circuit";
public static final String FIELD_METRICNAME = "metric";
Expand Down Expand Up @@ -111,7 +112,7 @@ public Future<JsonObject> getQueueCircuitInformation(String circuitHash) {
String circuit = Objects.toString(event.result().get(2), null);
String metric = Objects.toString(event.result().get(3), null);
JsonObject result = new JsonObject();
result.put("status", state.name().toLowerCase());
result.put(FIELD_STATUS, state.name().toLowerCase());
JsonObject info = new JsonObject();
if (failRatioStr != null) {
info.put(FIELD_FAILRATIO, Integer.valueOf(failRatioStr));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package org.swisspush.gateleen.queue.queuing.circuitbreaker.monitoring;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.*;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.exception.GateleenExceptionFactory;
import org.swisspush.gateleen.core.lock.Lock;
import org.swisspush.gateleen.core.util.Address;
import org.swisspush.gateleen.core.util.LockUtil;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreakerStorage;
import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueCircuitState;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import static org.swisspush.gateleen.core.util.LockUtil.acquireLock;
import static org.swisspush.gateleen.core.util.LockUtil.calcLockExpiry;
import static org.swisspush.gateleen.queue.queuing.circuitbreaker.impl.RedisQueueCircuitBreakerStorage.*;

public class QueueCircuitBreakerMetricsCollector {

private final Logger log = LoggerFactory.getLogger(QueueCircuitBreakerMetricsCollector.class);

private final Lock lock;
private final LockUtil lockUtil;

public static final String COLLECT_METRICS_TASK_LOCK = "collectCircuitBreakerMetrics";
public static final String CIRCUIT_BREAKER_STATUS_METRIC = "gateleen.circuitbreaker.status";
public static final String CIRCUIT_BREAKER_FAILRATIO_METRIC = "gateleen.circuitbreaker.failratio";

private final QueueCircuitBreakerStorage queueCircuitBreakerStorage;
private final MeterRegistry meterRegistry;
private final long metricCollectionIntervalMs;

private final Map<String, AtomicInteger> circuitStateMap = new HashMap<>();
private final Map<String, AtomicInteger> circuitFailRatioMap = new HashMap<>();

public QueueCircuitBreakerMetricsCollector(Vertx vertx, Lock lock, QueueCircuitBreakerStorage queueCircuitBreakerStorage,
MeterRegistry meterRegistry, GateleenExceptionFactory exceptionFactory,
long metricCollectionIntervalSeconds) {
this.lock = lock;
this.lockUtil = new LockUtil(exceptionFactory);
this.queueCircuitBreakerStorage = queueCircuitBreakerStorage;
this.meterRegistry = meterRegistry;

this.metricCollectionIntervalMs = metricCollectionIntervalSeconds * 1000;

vertx.setPeriodic(metricCollectionIntervalMs, event -> {
collectMetrics().onFailure(event1 -> log.error("Could not collect metrics. Message: {}", event1.getMessage()));
});
}

public Future<Void> collectMetrics() {
log.debug("Collecting metrics");
Promise<Void> promise = Promise.promise();
final String token = createToken(COLLECT_METRICS_TASK_LOCK);
acquireLock(lock, COLLECT_METRICS_TASK_LOCK, token, calcLockExpiry(metricCollectionIntervalMs), log).onComplete(lockEvent -> {
if (lockEvent.succeeded()) {
if (lockEvent.result()) {
handleMetricsCollection(token).onComplete(event -> {
if (event.succeeded()) {
promise.complete();
} else {
promise.fail(event.cause());
}
});
} else {
promise.complete();
}
} else {
log.error("Could not acquire lock '{}'. Message: {}", COLLECT_METRICS_TASK_LOCK, lockEvent.cause().getMessage());
promise.fail(lockEvent.cause().getMessage());
}
});
return promise.future();
}

private Future<Void> handleMetricsCollection(String token) {
return queueCircuitBreakerStorage.getAllCircuits().compose((Function<JsonObject, Future<Void>>) entries -> {
extractMetricsFromCircuitsObject(entries);
return Future.succeededFuture();
}).andThen(event -> lockUtil.releaseLock(lock, COLLECT_METRICS_TASK_LOCK, token, log));
}

private void extractMetricsFromCircuitsObject(JsonObject circuits) {
circuits.stream().forEach(entry -> {
String circuitName = entry.getKey();
JsonObject circuitValue = (JsonObject) entry.getValue();
QueueCircuitState queueCircuitState = QueueCircuitState.fromString(circuitValue.getString(FIELD_STATUS), null);
if (queueCircuitState == null) {
log.warn("No status found for circuit '{}'", circuitName);
return;
}

JsonObject infos = circuitValue.getJsonObject("infos");
if (infos != null) {
String metric = infos.getString(FIELD_METRICNAME);
Integer failRatio = infos.getInteger(FIELD_FAILRATIO);
if (metric != null && failRatio != null) {
publishMetric(metric, queueCircuitState, failRatio);
}
}
});
}

private void publishMetric(String metricName, QueueCircuitState queueCircuitState, int failRatio) {
Integer stateValue = circuitStateToValue(queueCircuitState);
if(stateValue != null) {
getCircuitStateMeter(metricName).set(stateValue);
}
getCircuitFailRatioMeter(metricName).set(failRatio);
}

private String createToken(String appendix) {
return Address.instanceAddress() + "_" + System.currentTimeMillis() + "_" + appendix;
}

private AtomicInteger getCircuitStateMeter(String metricName) {
return circuitStateMap.computeIfAbsent(metricName, key -> {
AtomicInteger newMeterValue = new AtomicInteger();
Gauge.builder(CIRCUIT_BREAKER_STATUS_METRIC, newMeterValue, AtomicInteger::get)
.description("Status of the circuit, 0=CLOSED, 1=HALF_OPEN, 2=OPEN")
.tag("metricName", metricName)
.register(meterRegistry);
return newMeterValue;
});
}

private AtomicInteger getCircuitFailRatioMeter(String metricName) {
return circuitFailRatioMap.computeIfAbsent(metricName, key -> {
AtomicInteger newMeterValue = new AtomicInteger();
Gauge.builder(CIRCUIT_BREAKER_FAILRATIO_METRIC, newMeterValue, AtomicInteger::get)
.description("Fail ratio of the circuit in percentage")
.tag("metricName", metricName)
.register(meterRegistry);
return newMeterValue;
});
}

private Integer circuitStateToValue(QueueCircuitState queueCircuitState) {
if (queueCircuitState == null) {
return null;
}
switch (queueCircuitState) {
case CLOSED:
return 0;
case HALF_OPEN:
return 1;
case OPEN:
return 2;
default:
return null;
}
}
}
Loading

0 comments on commit fac9614

Please sign in to comment.