Skip to content

Commit

Permalink
use periodic to cleanup the myqueues
Browse files Browse the repository at this point in the history
  • Loading branch information
Xin Zheng committed Jan 23, 2025
1 parent a202d52 commit 480e49f
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,8 @@ private void registerMyqueuesCleanup() {
});
}
}));
} else {
log.debug("queue {} is empty since: {}", queueName, state.lastConsumedTimestampMillis);
}
}
});
Expand Down Expand Up @@ -989,7 +991,10 @@ private Future<Void> consume(final String queueName) {
if (uid.equals(consumer)) {
QueueState state = myQueues.get(queueName).state;
log.trace("RedisQues consumer: {} queue: {} state: {}", consumer, queueName, state);
// Get the next message only once the previous has
// been completely processed
if (state != QueueState.CONSUMING) {
setMyQueuesState(queueName, QueueState.CONSUMING);
if (state == null) {
// No previous state was stored. Maybe the
// consumer was restarted
Expand Down Expand Up @@ -1141,9 +1146,8 @@ private Future<Void> readQueue(final String queueName) {
} else {
// This can happen when requests to consume happen at the same moment the queue is emptied.
log.debug("Got a request to consume from empty queue {}", queueName);

setMyQueuesState(queueName, QueueState.READY);
log.debug("queue {} is empty since: {}", queueName, myQueues.get(queueName).lastConsumedTimestampMillis);

if (dequeueStatisticEnabled) {
dequeueStatistic.computeIfPresent(queueName, (s, dequeueStatistic) -> {
dequeueStatistic.setMarkedForRemoval();
Expand Down

0 comments on commit 480e49f

Please sign in to comment.