Skip to content

Commit

Permalink
Merge branch 'develop' into remove-empty-queue-from-myqueues
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/main/java/org/swisspush/redisques/RedisQues.java
  • Loading branch information
Xin Zheng committed Jan 21, 2025
2 parents aa6a788 + 716a61b commit bd3fff4
Show file tree
Hide file tree
Showing 30 changed files with 1,713 additions and 247 deletions.
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ The following configuration values are available:
| redisReconnectAttempts | 0 | The amount of attempts to reconnect when redis connection is lost. Use **0** to not reconnect at all or **-1** to reconnect indefinitely. |
| redisReconnectDelaySec | 30 | The interval [s] to attempt to reconnect when redis connection is lost. |
| redisPoolRecycleTimeoutMs | 180000 | The timeout [ms] when the connection pool is recycled. Use **-1** when having reconnect feature enabled. |
| redisMonitoringEnabled | true | Enable / disable monitoring of redis metrics |
| micrometerMetricsEnabled | false | Enable / disable collection of metrics using micrometer |
| micrometerMetricsIdentifier | default | Identifier to track values from multiple redisques instances |
| httpRequestHandlerEnabled | false | Enable / disable the HTTP API |
Expand All @@ -80,7 +81,7 @@ The following configuration values are available:
| dequeueStatisticReportIntervalSec | -1 | The interval [s] to publish the dequeue statistics into shared map. Use **-1** to not publish at all. In a hazelcast-cluster environment need config Semaphore first, see: [Semaphore Config](#Semaphore Config) |
| publish-metrics-address | | The EventBus address to send collected redis metrics to |
| metric-storage-name | queue | The name of the storage used in the published metrics |
| metric-refresh-period | 10 | The frequency [s] of collecting metrics from redis database |
| metric-refresh-period | 0 | The frequency [s] of collecting metrics from redis database. Use **0** or **-1** to not periodically collect metrics |

### Configuration util

Expand Down Expand Up @@ -323,6 +324,27 @@ Response Data
}
```

#### monitor

Request Data
```
{
"operation": "monitor",
"payload": {
"emptyQueues": <boolean value to define whether to include empty queues or not>,
"limit": <limit the amount of queues to return>
}
}
```

Response Data
```
{
"status": "ok" / "error",
"value": <objArr RESULT>
}
```

#### getQueueItems

Request Data
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.swisspush</groupId>
<artifactId>redisques</artifactId>
<version>4.1.10-SNAPSHOT</version>
<version>4.1.11-SNAPSHOT</version>
<name>redisques</name>
<description>
A highly scalable redis-persistent queuing system for vertx
Expand Down
25 changes: 1 addition & 24 deletions src/main/java/org/swisspush/redisques/QueueStatsService.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package org.swisspush.redisques;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.json.JsonArray;
Expand All @@ -14,7 +12,6 @@
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;

import static java.lang.Long.compare;
Expand Down Expand Up @@ -49,18 +46,14 @@ public class QueueStatsService {
private final RedisQuesExceptionFactory exceptionFactory;
private final Semaphore incomingRequestQuota;

private final AtomicLong maxQueueSize = new AtomicLong(0);

public QueueStatsService(
Vertx vertx,
EventBus eventBus,
String redisquesAddress,
QueueStatisticsCollector queueStatisticsCollector,
DequeueStatisticCollector dequeueStatisticCollector,
RedisQuesExceptionFactory exceptionFactory,
Semaphore incomingRequestQuota,
MeterRegistry meterRegistry,
String metricsIdentifier
Semaphore incomingRequestQuota
) {
this.vertx = vertx;
this.eventBus = eventBus;
Expand All @@ -69,13 +62,6 @@ public QueueStatsService(
this.dequeueStatisticCollector = dequeueStatisticCollector;
this.exceptionFactory = exceptionFactory;
this.incomingRequestQuota = incomingRequestQuota;

if (meterRegistry != null) {
Gauge.builder(MetricMeter.MAX_QUEUE_SIZE.getId(), maxQueueSize, AtomicLong::get).
description(MetricMeter.MAX_QUEUE_SIZE.getDescription())
.tag(MetricTags.IDENTIFIER.getId(), metricsIdentifier).
register(meterRegistry);
}
}

public <CTX> void getQueueStats(CTX mCtx, GetQueueStatsMentor<CTX> mentor) {
Expand Down Expand Up @@ -174,19 +160,10 @@ private <CTX> void fetchQueueNamesAndSize(GetQueueStatsRequest<CTX> req, BiConsu
int limit = req.mentor.limit(req.mCtx);
if (limit != 0 && queues.size() > limit) queues = queues.subList(0, limit);
req.queues = queues;
collectMaxQueueSize(queues);
onDone.accept(null, req);
});
}

private void collectMaxQueueSize(List<Queue> queues) {
if (queues.isEmpty()) {
maxQueueSize.set(0);
} else {
maxQueueSize.set(queues.get(0).getSize());
}
}

private <CTX> void fetchRetryDetails(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
long begGetQueueStatsMs = currentTimeMillis();
assert req.queueNames != null;
Expand Down
107 changes: 33 additions & 74 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.micrometer.backends.BackendRegistries;
Expand All @@ -24,7 +25,10 @@
import org.swisspush.redisques.action.QueueAction;
import org.swisspush.redisques.exception.RedisQuesExceptionFactory;
import org.swisspush.redisques.handler.RedisquesHttpRequestHandler;
import org.swisspush.redisques.metrics.PeriodicMetricsCollector;
import org.swisspush.redisques.lock.Lock;
import org.swisspush.redisques.lock.impl.RedisBasedLock;
import org.swisspush.redisques.metrics.MetricsCollector;
import org.swisspush.redisques.metrics.MetricsCollectorScheduler;
import org.swisspush.redisques.performance.UpperBoundParallel;
import org.swisspush.redisques.scheduling.PeriodicSkipScheduler;
import org.swisspush.redisques.util.*;
Expand All @@ -48,37 +52,8 @@

import static java.lang.System.currentTimeMillis;
import static org.swisspush.redisques.exception.RedisQuesExceptionFactory.newThriftyExceptionFactory;
import static org.swisspush.redisques.util.RedisquesAPI.ERROR;
import static org.swisspush.redisques.util.RedisquesAPI.MESSAGE;
import static org.swisspush.redisques.util.RedisquesAPI.OK;
import static org.swisspush.redisques.util.RedisquesAPI.OPERATION;
import static org.swisspush.redisques.util.RedisquesAPI.PAYLOAD;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.addQueueItem;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.bulkDeleteLocks;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.bulkDeleteQueues;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.bulkPutLocks;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.deleteAllLocks;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.deleteAllQueueItems;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.deleteLock;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.deleteQueueItem;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.enqueue;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getAllLocks;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getConfiguration;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getLock;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueueItem;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueueItems;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueueItemsCount;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueues;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueuesCount;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueuesItemsCount;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueuesSpeed;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.getQueuesStatistics;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.lockedEnqueue;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.putLock;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.replaceQueueItem;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.setConfiguration;
import static org.swisspush.redisques.util.RedisquesAPI.STATUS;
import static org.swisspush.redisques.util.RedisquesAPI.*;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.*;

public class RedisQues extends AbstractVerticle {

Expand Down Expand Up @@ -269,6 +244,7 @@ private class QueueProcessStats {
private final Semaphore queueStatsRequestQuota;
private final Semaphore getQueuesItemsCountRedisRequestQuota;
private final Semaphore activeQueueRegRefreshReqQuota;
private Lock lock;

public RedisQues() {
this(null, null, null, newThriftyExceptionFactory(), new Semaphore(Integer.MAX_VALUE),
Expand Down Expand Up @@ -379,10 +355,6 @@ public void start(Promise<Void> promise) {
RedisquesConfiguration modConfig = configurationProvider.configuration();
log.info("Starting Redisques module with configuration: {}", configurationProvider.configuration());

if(configurationProvider.configuration().getMicrometerMetricsEnabled()) {
initMicrometerMetrics(modConfig);
}

int dequeueStatisticReportIntervalSec = modConfig.getDequeueStatisticReportIntervalSec();
if (modConfig.isDequeueStatsEnabled()) {
dequeueStatisticEnabled = true;
Expand Down Expand Up @@ -421,13 +393,16 @@ private void initMicrometerMetrics(RedisquesConfiguration modConfig) {
meterRegistry = BackendRegistries.getDefaultNow();
}
String metricsIdentifier = modConfig.getMicrometerMetricsIdentifier();
dequeueCounter = Counter.builder(MetricMeter.DEQUEUE.getId())
dequeueCounter = Counter.builder(MetricMeter.DEQUEUE.getId())
.description(MetricMeter.DEQUEUE.getDescription()).tag(MetricTags.IDENTIFIER.getId(), metricsIdentifier).register(meterRegistry);

String address = modConfig.getAddress();
int metricRefreshPeriod = modConfig.getMetricRefreshPeriod();
String identifier = modConfig.getMicrometerMetricsIdentifier();
new PeriodicMetricsCollector(vertx, periodicSkipScheduler, address, identifier, meterRegistry, metricRefreshPeriod);
if (metricRefreshPeriod > 0) {
String identifier = modConfig.getMicrometerMetricsIdentifier();
MetricsCollector metricsCollector = new MetricsCollector(vertx, uid, address, identifier, meterRegistry, lock, metricRefreshPeriod);
new MetricsCollectorScheduler(vertx, metricsCollector, metricRefreshPeriod);
}
}

private void initialize() {
Expand All @@ -436,19 +411,27 @@ private void initialize() {
redisProvider, queuesPrefix, vertx, exceptionFactory, redisMonitoringReqQuota,
configuration.getQueueSpeedIntervalSec());

this.lock = new RedisBasedLock(redisProvider, exceptionFactory);

if(configurationProvider.configuration().getMicrometerMetricsEnabled()) {
initMicrometerMetrics(configuration);
}

RedisquesHttpRequestHandler.init(
vertx, configuration, queueStatisticsCollector, dequeueStatisticCollector,
exceptionFactory, queueStatsRequestQuota, meterRegistry);
vertx, configuration, queueStatisticsCollector, dequeueStatisticCollector,
exceptionFactory, queueStatsRequestQuota);

// only initialize memoryUsageProvider when not provided in the constructor
if (memoryUsageProvider == null) {
memoryUsageProvider = new DefaultMemoryUsageProvider(redisProvider, vertx,
configurationProvider.configuration().getMemoryUsageCheckIntervalSec());
}

HttpClient client = vertx.createHttpClient();

assert getQueuesItemsCountRedisRequestQuota != null;
queueActionFactory = new QueueActionFactory(
redisProvider, vertx, log, queuesKey, queuesPrefix, consumersPrefix, locksKey,
redisProvider, vertx, client, log, queuesKey, queuesPrefix, consumersPrefix, locksKey,
memoryUsageProvider, queueStatisticsCollector, exceptionFactory,
configurationProvider, getQueuesItemsCountRedisRequestQuota, meterRegistry);

Expand Down Expand Up @@ -476,6 +459,7 @@ private void initialize() {
queueActions.put(getQueuesStatistics, queueActionFactory.buildQueueAction(getQueuesStatistics));
queueActions.put(setConfiguration, queueActionFactory.buildQueueAction(setConfiguration));
queueActions.put(getConfiguration, queueActionFactory.buildQueueAction(getConfiguration));
queueActions.put(monitor, queueActionFactory.buildQueueAction(monitor));

String address = configuration.getAddress();

Expand Down Expand Up @@ -569,9 +553,13 @@ private void registerNotExpiredQueueCheck() {
});
}

private void registerMetricsGathering(RedisquesConfiguration configuration){
private void registerMetricsGathering(RedisquesConfiguration configuration) {
if (!configuration.getRedisMonitoringEnabled()) {
return;
}

String metricsAddress = configuration.getPublishMetricsAddress();
if(Strings.isNullOrEmpty(metricsAddress)) {
if (Strings.isNullOrEmpty(metricsAddress)) {
return;
}
String metricStorageName = configuration.getMetricStorageName();
Expand Down Expand Up @@ -1280,7 +1268,7 @@ private Future<Void> notifyConsumer(final String queueName) {
log.warn("RedisQues consumer {} of queue {} does not exist.", consumer, queueName);
redisAPI.del(Collections.singletonList(key), result -> {
if (result.failed()) {
log.warn("Failed to removed consumer '{}'", key, exceptionFactory.newException(event.cause()));
log.warn("Failed to removed consumer '{}'", key, exceptionFactory.newException(result.cause()));
} else {
log.debug("{} consumer key removed", result.result().toLong());
}
Expand Down Expand Up @@ -1528,33 +1516,4 @@ private QueueConfiguration findQueueConfiguration(String queueName) {
}
return null;
}

private class FailedAsyncResult<Response> implements AsyncResult<Response> {

private final Throwable cause;

private FailedAsyncResult(Throwable cause) {
this.cause = cause;
}

@Override
public Response result() {
return null;
}

@Override
public Throwable cause() {
return cause;
}

@Override
public boolean succeeded() {
return false;
}

@Override
public boolean failed() {
return true;
}
}
}
Loading

0 comments on commit bd3fff4

Please sign in to comment.