Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#238 Collect queue size over EventBus / Refactored MetricsCollector #239

Merged
merged 1 commit into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ The following configuration values are available:
| redisReconnectAttempts | 0 | The amount of attempts to reconnect when redis connection is lost. Use **0** to not reconnect at all or **-1** to reconnect indefinitely. |
| redisReconnectDelaySec | 30 | The interval [s] to attempt to reconnect when redis connection is lost. |
| redisPoolRecycleTimeoutMs | 180000 | The timeout [ms] when the connection pool is recycled. Use **-1** when having reconnect feature enabled. |
| redisMonitoringEnabled | true | Enable / disable monitoring of redis metrics |
| micrometerMetricsEnabled | false | Enable / disable collection of metrics using micrometer |
| micrometerMetricsIdentifier | default | Identifier to track values from multiple redisques instances |
| httpRequestHandlerEnabled | false | Enable / disable the HTTP API |
Expand All @@ -80,7 +81,7 @@ The following configuration values are available:
| dequeueStatisticReportIntervalSec | -1 | The interval [s] to publish the dequeue statistics into shared map. Use **-1** to not publish at all. In a hazelcast-cluster environment need config Semaphore first, see: [Semaphore Config](#Semaphore Config) |
| publish-metrics-address | | The EventBus address to send collected redis metrics to |
| metric-storage-name | queue | The name of the storage used in the published metrics |
| metric-refresh-period | 10 | The frequency [s] of collecting metrics from redis database |
| metric-refresh-period | 0 | The frequency [s] of collecting metrics from redis database. Use **0** or **-1** to not periodically collect metrics |

### Configuration util

Expand Down
25 changes: 1 addition & 24 deletions src/main/java/org/swisspush/redisques/QueueStatsService.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package org.swisspush.redisques;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.json.JsonArray;
Expand All @@ -14,7 +12,6 @@
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;

import static java.lang.Long.compare;
Expand Down Expand Up @@ -49,18 +46,14 @@ public class QueueStatsService {
private final RedisQuesExceptionFactory exceptionFactory;
private final Semaphore incomingRequestQuota;

private final AtomicLong maxQueueSize = new AtomicLong(0);

public QueueStatsService(
Vertx vertx,
EventBus eventBus,
String redisquesAddress,
QueueStatisticsCollector queueStatisticsCollector,
DequeueStatisticCollector dequeueStatisticCollector,
RedisQuesExceptionFactory exceptionFactory,
Semaphore incomingRequestQuota,
MeterRegistry meterRegistry,
String metricsIdentifier
Semaphore incomingRequestQuota
) {
this.vertx = vertx;
this.eventBus = eventBus;
Expand All @@ -69,13 +62,6 @@ public QueueStatsService(
this.dequeueStatisticCollector = dequeueStatisticCollector;
this.exceptionFactory = exceptionFactory;
this.incomingRequestQuota = incomingRequestQuota;

if (meterRegistry != null) {
Gauge.builder(MetricMeter.MAX_QUEUE_SIZE.getId(), maxQueueSize, AtomicLong::get).
description(MetricMeter.MAX_QUEUE_SIZE.getDescription())
.tag(MetricTags.IDENTIFIER.getId(), metricsIdentifier).
register(meterRegistry);
}
}

public <CTX> void getQueueStats(CTX mCtx, GetQueueStatsMentor<CTX> mentor) {
Expand Down Expand Up @@ -174,19 +160,10 @@ private <CTX> void fetchQueueNamesAndSize(GetQueueStatsRequest<CTX> req, BiConsu
int limit = req.mentor.limit(req.mCtx);
if (limit != 0 && queues.size() > limit) queues = queues.subList(0, limit);
req.queues = queues;
collectMaxQueueSize(queues);
onDone.accept(null, req);
});
}

private void collectMaxQueueSize(List<Queue> queues) {
if (queues.isEmpty()) {
maxQueueSize.set(0);
} else {
maxQueueSize.set(queues.get(0).getSize());
}
}

private <CTX> void fetchRetryDetails(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
long begGetQueueStatsMs = currentTimeMillis();
assert req.queueNames != null;
Expand Down
25 changes: 16 additions & 9 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import org.swisspush.redisques.handler.RedisquesHttpRequestHandler;
import org.swisspush.redisques.lock.Lock;
import org.swisspush.redisques.lock.impl.RedisBasedLock;
import org.swisspush.redisques.metrics.PeriodicMetricsCollector;
import org.swisspush.redisques.metrics.MetricsCollector;
import org.swisspush.redisques.metrics.MetricsCollectorScheduler;
import org.swisspush.redisques.performance.UpperBoundParallel;
import org.swisspush.redisques.scheduling.PeriodicSkipScheduler;
import org.swisspush.redisques.util.*;
Expand Down Expand Up @@ -363,14 +364,16 @@ private void initMicrometerMetrics(RedisquesConfiguration modConfig) {
meterRegistry = BackendRegistries.getDefaultNow();
}
String metricsIdentifier = modConfig.getMicrometerMetricsIdentifier();
dequeueCounter = Counter.builder(MetricMeter.DEQUEUE.getId())
dequeueCounter = Counter.builder(MetricMeter.DEQUEUE.getId())
.description(MetricMeter.DEQUEUE.getDescription()).tag(MetricTags.IDENTIFIER.getId(), metricsIdentifier).register(meterRegistry);

String address = modConfig.getAddress();
int metricRefreshPeriod = modConfig.getMetricRefreshPeriod();
String identifier = modConfig.getMicrometerMetricsIdentifier();
new PeriodicMetricsCollector(vertx, uid, periodicSkipScheduler, address, identifier, meterRegistry, lock,
metricRefreshPeriod);
if (metricRefreshPeriod > 0) {
String identifier = modConfig.getMicrometerMetricsIdentifier();
MetricsCollector metricsCollector = new MetricsCollector(vertx, uid, address, identifier, meterRegistry, lock, metricRefreshPeriod);
new MetricsCollectorScheduler(vertx, metricsCollector, metricRefreshPeriod);
}
}

private void initialize() {
Expand All @@ -386,8 +389,8 @@ private void initialize() {
}

RedisquesHttpRequestHandler.init(
vertx, configuration, queueStatisticsCollector, dequeueStatisticCollector,
exceptionFactory, queueStatsRequestQuota, meterRegistry);
vertx, configuration, queueStatisticsCollector, dequeueStatisticCollector,
exceptionFactory, queueStatsRequestQuota);

// only initialize memoryUsageProvider when not provided in the constructor
if (memoryUsageProvider == null) {
Expand Down Expand Up @@ -511,9 +514,13 @@ private void registerNotExpiredQueueCheck() {
});
}

private void registerMetricsGathering(RedisquesConfiguration configuration){
private void registerMetricsGathering(RedisquesConfiguration configuration) {
if (!configuration.getRedisMonitoringEnabled()) {
return;
}

String metricsAddress = configuration.getPublishMetricsAddress();
if(Strings.isNullOrEmpty(metricsAddress)) {
if (Strings.isNullOrEmpty(metricsAddress)) {
return;
}
String metricStorageName = configuration.getMetricStorageName();
Expand Down
50 changes: 32 additions & 18 deletions src/main/java/org/swisspush/redisques/action/MonitorAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.RequestOptions;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.swisspush.redisques.util.RedisquesConfiguration;

import java.util.Base64;

import static org.swisspush.redisques.util.RedisquesAPI.*;
import static org.swisspush.redisques.util.RedisquesAPI.LIMIT;

Expand All @@ -24,7 +27,7 @@ public MonitorAction(RedisquesConfiguration modConfig, HttpClient client, Logger

@Override
public void execute(Message<JsonObject> event) {
if(!modConfig.getHttpRequestHandlerEnabled()) {
if (!modConfig.getHttpRequestHandlerEnabled()) {
event.reply(createErrorReply().put(MESSAGE, "HttpRequestHandler is disabled"));
return;
}
Expand All @@ -34,22 +37,33 @@ public void execute(Message<JsonObject> event) {

String requestParams = "?limit=" + limit + "&emptyQueues=" + emptyQueues;

client.request(HttpMethod.GET, modConfig.getHttpRequestHandlerPort(), "localhost",
modConfig.getHttpRequestHandlerPrefix()+ "/monitor" + requestParams)
.compose(req -> req.send().compose(response -> {
if (response.statusCode() == 200) {
return response.body();
} else {
throw new RuntimeException("Failed to fetch monitor data: " + response.statusMessage());
}
}))
.onComplete(ar -> {
if (ar.succeeded()) {
event.reply(createOkReply().put(VALUE, ar.result().toJsonObject()));
} else {
event.reply(createErrorReply().put(MESSAGE, ar.cause().getMessage()));
log.warn("Failed to fetch monitor data", ar.cause());
}
});
RequestOptions requestOptions = new RequestOptions()
.setMethod(HttpMethod.GET)
.setPort(modConfig.getHttpRequestHandlerPort())
.setHost("localhost")
.setURI(modConfig.getHttpRequestHandlerPrefix() + "/monitor" + requestParams);

if(modConfig.getHttpRequestHandlerAuthenticationEnabled()) {
String credentials = modConfig.getHttpRequestHandlerUsername() + ":" + modConfig.getHttpRequestHandlerPassword();
String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes());
requestOptions.putHeader("Authorization", "Basic " + encodedCredentials);
}

client.request(requestOptions)
.compose(req -> req.send().compose(response -> {
if (response.statusCode() == 200) {
return response.body();
} else {
throw new RuntimeException("Failed to fetch monitor data: " + response.statusMessage());
}
}))
.onComplete(ar -> {
if (ar.succeeded()) {
event.reply(createOkReply().put(VALUE, ar.result().toJsonObject()));
} else {
event.reply(createErrorReply().put(MESSAGE, ar.cause().getMessage()));
log.warn("Failed to fetch monitor data", ar.cause());
}
});
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.swisspush.redisques.handler;

import io.micrometer.core.instrument.MeterRegistry;
import io.netty.util.internal.StringUtil;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
Expand Down Expand Up @@ -84,14 +83,14 @@ public class RedisquesHttpRequestHandler implements Handler<HttpServerRequest> {
public static void init(
Vertx vertx, RedisquesConfiguration modConfig, QueueStatisticsCollector queueStatisticsCollector,
DequeueStatisticCollector dequeueStatisticCollector, RedisQuesExceptionFactory exceptionFactory,
Semaphore queueStatsRequestQuota, MeterRegistry meterRegistry
Semaphore queueStatsRequestQuota
) {
log.info("Enabling http request handler: {}", modConfig.getHttpRequestHandlerEnabled());
if (modConfig.getHttpRequestHandlerEnabled()) {
if (modConfig.getHttpRequestHandlerPort() != null && modConfig.getHttpRequestHandlerUserHeader() != null) {
var handler = new RedisquesHttpRequestHandler(
vertx, modConfig, queueStatisticsCollector, dequeueStatisticCollector,
exceptionFactory, queueStatsRequestQuota, meterRegistry);
exceptionFactory, queueStatsRequestQuota);
// in Vert.x 2x 100-continues was activated per default, in vert.x 3x it is off per default.
HttpServerOptions options = new HttpServerOptions().setHandle100ContinueAutomatically(true);
vertx.createHttpServer(options).requestHandler(handler).listen(modConfig.getHttpRequestHandlerPort(), result -> {
Expand Down Expand Up @@ -126,8 +125,7 @@ private RedisquesHttpRequestHandler(
QueueStatisticsCollector queueStatisticsCollector,
DequeueStatisticCollector dequeueStatisticCollector,
RedisQuesExceptionFactory exceptionFactory,
Semaphore queueStatsRequestQuota,
MeterRegistry meterRegistry
Semaphore queueStatsRequestQuota
) {
this.vertx = vertx;
this.router = Router.router(vertx);
Expand All @@ -140,7 +138,7 @@ private RedisquesHttpRequestHandler(
this.exceptionFactory = exceptionFactory;
this.queueStatsService = new QueueStatsService(
vertx, eventBus, redisquesAddress, queueStatisticsCollector, dequeueStatisticCollector,
exceptionFactory, queueStatsRequestQuota, meterRegistry, modConfig.getMicrometerMetricsIdentifier());
exceptionFactory, queueStatsRequestQuota);

final String prefix = modConfig.getHttpRequestHandlerPrefix();

Expand Down
Loading
Loading