Skip to content

Commit

Permalink
#238 Collect queue size over EventBus / Refactored MetricsCollector
Browse files Browse the repository at this point in the history
  • Loading branch information
mcweba committed Jan 17, 2025
1 parent a17847a commit 2e48a72
Show file tree
Hide file tree
Showing 12 changed files with 493 additions and 193 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ The following configuration values are available:
| redisReconnectAttempts | 0 | The amount of attempts to reconnect when redis connection is lost. Use **0** to not reconnect at all or **-1** to reconnect indefinitely. |
| redisReconnectDelaySec | 30 | The interval [s] to attempt to reconnect when redis connection is lost. |
| redisPoolRecycleTimeoutMs | 180000 | The timeout [ms] when the connection pool is recycled. Use **-1** when having reconnect feature enabled. |
| redisMonitoringEnabled | true | Enable / disable monitoring of redis metrics |
| micrometerMetricsEnabled | false | Enable / disable collection of metrics using micrometer |
| micrometerMetricsIdentifier | default | Identifier to track values from multiple redisques instances |
| httpRequestHandlerEnabled | false | Enable / disable the HTTP API |
Expand All @@ -80,7 +81,7 @@ The following configuration values are available:
| dequeueStatisticReportIntervalSec | -1 | The interval [s] to publish the dequeue statistics into shared map. Use **-1** to not publish at all. In a hazelcast-cluster environment need config Semaphore first, see: [Semaphore Config](#Semaphore Config) |
| publish-metrics-address | | The EventBus address to send collected redis metrics to |
| metric-storage-name | queue | The name of the storage used in the published metrics |
| metric-refresh-period | 10 | The frequency [s] of collecting metrics from redis database |
| metric-refresh-period | 0 | The frequency [s] of collecting metrics from redis database. Use **0** or **-1** to not periodically collect metrics |

### Configuration util

Expand Down
25 changes: 1 addition & 24 deletions src/main/java/org/swisspush/redisques/QueueStatsService.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package org.swisspush.redisques;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.json.JsonArray;
Expand All @@ -14,7 +12,6 @@
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;

import static java.lang.Long.compare;
Expand Down Expand Up @@ -49,18 +46,14 @@ public class QueueStatsService {
private final RedisQuesExceptionFactory exceptionFactory;
private final Semaphore incomingRequestQuota;

private final AtomicLong maxQueueSize = new AtomicLong(0);

public QueueStatsService(
Vertx vertx,
EventBus eventBus,
String redisquesAddress,
QueueStatisticsCollector queueStatisticsCollector,
DequeueStatisticCollector dequeueStatisticCollector,
RedisQuesExceptionFactory exceptionFactory,
Semaphore incomingRequestQuota,
MeterRegistry meterRegistry,
String metricsIdentifier
Semaphore incomingRequestQuota
) {
this.vertx = vertx;
this.eventBus = eventBus;
Expand All @@ -69,13 +62,6 @@ public QueueStatsService(
this.dequeueStatisticCollector = dequeueStatisticCollector;
this.exceptionFactory = exceptionFactory;
this.incomingRequestQuota = incomingRequestQuota;

if (meterRegistry != null) {
Gauge.builder(MetricMeter.MAX_QUEUE_SIZE.getId(), maxQueueSize, AtomicLong::get).
description(MetricMeter.MAX_QUEUE_SIZE.getDescription())
.tag(MetricTags.IDENTIFIER.getId(), metricsIdentifier).
register(meterRegistry);
}
}

public <CTX> void getQueueStats(CTX mCtx, GetQueueStatsMentor<CTX> mentor) {
Expand Down Expand Up @@ -174,19 +160,10 @@ private <CTX> void fetchQueueNamesAndSize(GetQueueStatsRequest<CTX> req, BiConsu
int limit = req.mentor.limit(req.mCtx);
if (limit != 0 && queues.size() > limit) queues = queues.subList(0, limit);
req.queues = queues;
collectMaxQueueSize(queues);
onDone.accept(null, req);
});
}

private void collectMaxQueueSize(List<Queue> queues) {
if (queues.isEmpty()) {
maxQueueSize.set(0);
} else {
maxQueueSize.set(queues.get(0).getSize());
}
}

private <CTX> void fetchRetryDetails(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
long begGetQueueStatsMs = currentTimeMillis();
assert req.queueNames != null;
Expand Down
25 changes: 16 additions & 9 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import org.swisspush.redisques.handler.RedisquesHttpRequestHandler;
import org.swisspush.redisques.lock.Lock;
import org.swisspush.redisques.lock.impl.RedisBasedLock;
import org.swisspush.redisques.metrics.PeriodicMetricsCollector;
import org.swisspush.redisques.metrics.MetricsCollector;
import org.swisspush.redisques.metrics.MetricsCollectorScheduler;
import org.swisspush.redisques.performance.UpperBoundParallel;
import org.swisspush.redisques.scheduling.PeriodicSkipScheduler;
import org.swisspush.redisques.util.*;
Expand Down Expand Up @@ -363,14 +364,16 @@ private void initMicrometerMetrics(RedisquesConfiguration modConfig) {
meterRegistry = BackendRegistries.getDefaultNow();
}
String metricsIdentifier = modConfig.getMicrometerMetricsIdentifier();
dequeueCounter = Counter.builder(MetricMeter.DEQUEUE.getId())
dequeueCounter = Counter.builder(MetricMeter.DEQUEUE.getId())
.description(MetricMeter.DEQUEUE.getDescription()).tag(MetricTags.IDENTIFIER.getId(), metricsIdentifier).register(meterRegistry);

String address = modConfig.getAddress();
int metricRefreshPeriod = modConfig.getMetricRefreshPeriod();
String identifier = modConfig.getMicrometerMetricsIdentifier();
new PeriodicMetricsCollector(vertx, uid, periodicSkipScheduler, address, identifier, meterRegistry, lock,
metricRefreshPeriod);
if (metricRefreshPeriod > 0) {
String identifier = modConfig.getMicrometerMetricsIdentifier();
MetricsCollector metricsCollector = new MetricsCollector(vertx, uid, address, identifier, meterRegistry, lock, metricRefreshPeriod);
new MetricsCollectorScheduler(vertx, metricsCollector, metricRefreshPeriod);
}
}

private void initialize() {
Expand All @@ -386,8 +389,8 @@ private void initialize() {
}

RedisquesHttpRequestHandler.init(
vertx, configuration, queueStatisticsCollector, dequeueStatisticCollector,
exceptionFactory, queueStatsRequestQuota, meterRegistry);
vertx, configuration, queueStatisticsCollector, dequeueStatisticCollector,
exceptionFactory, queueStatsRequestQuota);

// only initialize memoryUsageProvider when not provided in the constructor
if (memoryUsageProvider == null) {
Expand Down Expand Up @@ -511,9 +514,13 @@ private void registerNotExpiredQueueCheck() {
});
}

private void registerMetricsGathering(RedisquesConfiguration configuration){
private void registerMetricsGathering(RedisquesConfiguration configuration) {
if (!configuration.getRedisMonitoringEnabled()) {
return;
}

String metricsAddress = configuration.getPublishMetricsAddress();
if(Strings.isNullOrEmpty(metricsAddress)) {
if (Strings.isNullOrEmpty(metricsAddress)) {
return;
}
String metricStorageName = configuration.getMetricStorageName();
Expand Down
50 changes: 32 additions & 18 deletions src/main/java/org/swisspush/redisques/action/MonitorAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.RequestOptions;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.swisspush.redisques.util.RedisquesConfiguration;

import java.util.Base64;

import static org.swisspush.redisques.util.RedisquesAPI.*;
import static org.swisspush.redisques.util.RedisquesAPI.LIMIT;

Expand All @@ -24,7 +27,7 @@ public MonitorAction(RedisquesConfiguration modConfig, HttpClient client, Logger

@Override
public void execute(Message<JsonObject> event) {
if(!modConfig.getHttpRequestHandlerEnabled()) {
if (!modConfig.getHttpRequestHandlerEnabled()) {
event.reply(createErrorReply().put(MESSAGE, "HttpRequestHandler is disabled"));
return;
}
Expand All @@ -34,22 +37,33 @@ public void execute(Message<JsonObject> event) {

String requestParams = "?limit=" + limit + "&emptyQueues=" + emptyQueues;

client.request(HttpMethod.GET, modConfig.getHttpRequestHandlerPort(), "localhost",
modConfig.getHttpRequestHandlerPrefix()+ "/monitor" + requestParams)
.compose(req -> req.send().compose(response -> {
if (response.statusCode() == 200) {
return response.body();
} else {
throw new RuntimeException("Failed to fetch monitor data: " + response.statusMessage());
}
}))
.onComplete(ar -> {
if (ar.succeeded()) {
event.reply(createOkReply().put(VALUE, ar.result().toJsonObject()));
} else {
event.reply(createErrorReply().put(MESSAGE, ar.cause().getMessage()));
log.warn("Failed to fetch monitor data", ar.cause());
}
});
RequestOptions requestOptions = new RequestOptions()
.setMethod(HttpMethod.GET)
.setPort(modConfig.getHttpRequestHandlerPort())
.setHost("localhost")
.setURI(modConfig.getHttpRequestHandlerPrefix() + "/monitor" + requestParams);

if(modConfig.getHttpRequestHandlerAuthenticationEnabled()) {
String credentials = modConfig.getHttpRequestHandlerUsername() + ":" + modConfig.getHttpRequestHandlerPassword();
String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes());
requestOptions.putHeader("Authorization", "Basic " + encodedCredentials);
}

client.request(requestOptions)
.compose(req -> req.send().compose(response -> {
if (response.statusCode() == 200) {
return response.body();
} else {
throw new RuntimeException("Failed to fetch monitor data: " + response.statusMessage());
}
}))
.onComplete(ar -> {
if (ar.succeeded()) {
event.reply(createOkReply().put(VALUE, ar.result().toJsonObject()));
} else {
event.reply(createErrorReply().put(MESSAGE, ar.cause().getMessage()));
log.warn("Failed to fetch monitor data", ar.cause());
}
});
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.swisspush.redisques.handler;

import io.micrometer.core.instrument.MeterRegistry;
import io.netty.util.internal.StringUtil;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
Expand Down Expand Up @@ -84,14 +83,14 @@ public class RedisquesHttpRequestHandler implements Handler<HttpServerRequest> {
public static void init(
Vertx vertx, RedisquesConfiguration modConfig, QueueStatisticsCollector queueStatisticsCollector,
DequeueStatisticCollector dequeueStatisticCollector, RedisQuesExceptionFactory exceptionFactory,
Semaphore queueStatsRequestQuota, MeterRegistry meterRegistry
Semaphore queueStatsRequestQuota
) {
log.info("Enabling http request handler: {}", modConfig.getHttpRequestHandlerEnabled());
if (modConfig.getHttpRequestHandlerEnabled()) {
if (modConfig.getHttpRequestHandlerPort() != null && modConfig.getHttpRequestHandlerUserHeader() != null) {
var handler = new RedisquesHttpRequestHandler(
vertx, modConfig, queueStatisticsCollector, dequeueStatisticCollector,
exceptionFactory, queueStatsRequestQuota, meterRegistry);
exceptionFactory, queueStatsRequestQuota);
// in Vert.x 2x 100-continues was activated per default, in vert.x 3x it is off per default.
HttpServerOptions options = new HttpServerOptions().setHandle100ContinueAutomatically(true);
vertx.createHttpServer(options).requestHandler(handler).listen(modConfig.getHttpRequestHandlerPort(), result -> {
Expand Down Expand Up @@ -126,8 +125,7 @@ private RedisquesHttpRequestHandler(
QueueStatisticsCollector queueStatisticsCollector,
DequeueStatisticCollector dequeueStatisticCollector,
RedisQuesExceptionFactory exceptionFactory,
Semaphore queueStatsRequestQuota,
MeterRegistry meterRegistry
Semaphore queueStatsRequestQuota
) {
this.vertx = vertx;
this.router = Router.router(vertx);
Expand All @@ -140,7 +138,7 @@ private RedisquesHttpRequestHandler(
this.exceptionFactory = exceptionFactory;
this.queueStatsService = new QueueStatsService(
vertx, eventBus, redisquesAddress, queueStatisticsCollector, dequeueStatisticCollector,
exceptionFactory, queueStatsRequestQuota, meterRegistry, modConfig.getMicrometerMetricsIdentifier());
exceptionFactory, queueStatsRequestQuota);

final String prefix = modConfig.getHttpRequestHandlerPrefix();

Expand Down
Loading

0 comments on commit 2e48a72

Please sign in to comment.