diff --git a/store/src/main/java/com/automq/rocketmq/store/service/ReviveService.java b/store/src/main/java/com/automq/rocketmq/store/service/ReviveService.java index 064e7dad8..ba60063e2 100644 --- a/store/src/main/java/com/automq/rocketmq/store/service/ReviveService.java +++ b/store/src/main/java/com/automq/rocketmq/store/service/ReviveService.java @@ -219,8 +219,8 @@ public void run() { } protected void tryRevive(TimerTag timerTag) { - this.reviveTimestamp = System.currentTimeMillis(); backgroundExecutor.execute(new ReviveTask(timerTag.payloadAsByteBuffer())); + reviveTimestamp = timerTag.deliveryTimestamp(); } public long reviveTimestamp() { diff --git a/store/src/test/java/com/automq/rocketmq/store/MessageStoreTest.java b/store/src/test/java/com/automq/rocketmq/store/MessageStoreTest.java index 17f743192..dc2358751 100644 --- a/store/src/test/java/com/automq/rocketmq/store/MessageStoreTest.java +++ b/store/src/test/java/com/automq/rocketmq/store/MessageStoreTest.java @@ -47,6 +47,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -195,7 +196,7 @@ public void pop_order() { } List receiptHandles = new ArrayList<>(); // 2. pop 3 message - int invisibleDuration = 800; + int invisibleDuration = 100; PopResult popResult = messageStore.pop(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID, Filter.DEFAULT_FILTER, 3, true, false, invisibleDuration).join(); assertEquals(3, popResult.messageList().size()); for (FlatMessageExt message : popResult.messageList()) { @@ -223,6 +224,7 @@ public void pop_order() { messageStore.ack(receiptHandles.get(2)).join(); // 9. pop again + long reviveTimestamp = System.currentTimeMillis() + invisibleDuration; popResult = messageStore.pop(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID, Filter.DEFAULT_FILTER, 3, true, false, invisibleDuration).join(); assertEquals(PopResult.Status.FOUND, popResult.status()); assertEquals(2, popResult.messageList().size()); @@ -237,8 +239,8 @@ public void pop_order() { assertEquals(PopResult.Status.LOCKED, popResult.status()); // 11. after 1100ms, pop again - long reviveTimestamp = System.currentTimeMillis() + invisibleDuration; - await().until(() -> reviveService.reviveTimestamp() > reviveTimestamp); + await().atMost(2, TimeUnit.SECONDS) + .until(() -> reviveService.reviveTimestamp() >= reviveTimestamp && reviveService.inflightReviveCount() == 0); popResult = messageStore.pop(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID, Filter.DEFAULT_FILTER, 3, true, false, invisibleDuration).join(); assertEquals(PopResult.Status.FOUND, popResult.status()); @@ -260,7 +262,7 @@ public void pop_snapshot() { } List receiptHandles = new ArrayList<>(); // 2. pop 3 message - int invisibleDuration = 800; + int invisibleDuration = 100; PopResult popResult = messageStore.pop(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID, Filter.DEFAULT_FILTER, 3, false, false, invisibleDuration).join(); assertEquals(3, popResult.messageList().size()); for (FlatMessageExt message : popResult.messageList()) { @@ -272,6 +274,7 @@ public void pop_snapshot() { messageStore.ack(receiptHandles.get(2)).join(); // 4. pop 3 message + long reviveTimestamp = System.currentTimeMillis() + invisibleDuration; popResult = messageStore.pop(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID, Filter.DEFAULT_FILTER, 3, false, false, invisibleDuration).join(); assertEquals(2, popResult.messageList().size()); for (FlatMessageExt message : popResult.messageList()) { @@ -282,7 +285,6 @@ public void pop_snapshot() { popResult = messageStore.pop(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID, Filter.DEFAULT_FILTER, 3, false, false, invisibleDuration).join(); assertEquals(PopResult.Status.END_OF_QUEUE, popResult.status()); - long reviveTimestamp = System.currentTimeMillis() + invisibleDuration; popResult = messageStore.pop(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID, Filter.DEFAULT_FILTER, 3, false, true, invisibleDuration).join(); assertEquals(PopResult.Status.END_OF_QUEUE, popResult.status()); @@ -295,7 +297,7 @@ public void pop_snapshot() { assertEquals(1, streamStore.nextOffset(snapshotStream.getStreamId())); // 6. after 1100ms, pop again - await().until(() -> reviveService.reviveTimestamp() > reviveTimestamp); + await().until(() -> reviveService.reviveTimestamp() >= reviveTimestamp); popResult = messageStore.pop(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID, Filter.DEFAULT_FILTER, 3, false, false, invisibleDuration).join(); assertEquals(PopResult.Status.END_OF_QUEUE, popResult.status()); diff --git a/store/src/test/java/com/automq/rocketmq/store/service/ReviveServiceTest.java b/store/src/test/java/com/automq/rocketmq/store/service/ReviveServiceTest.java index 5c16ff9ea..0c29957af 100644 --- a/store/src/test/java/com/automq/rocketmq/store/service/ReviveServiceTest.java +++ b/store/src/test/java/com/automq/rocketmq/store/service/ReviveServiceTest.java @@ -107,7 +107,8 @@ void tryRevive_normal() throws StoreException { FlatMessage message = FlatMessage.getRootAsFlatMessage(buildMessage(TOPIC_ID, QUEUE_ID, "TagA")); logicQueue.put(message).join(); // pop message - int invisibleDuration = 1000; + int invisibleDuration = 100; + long reviveTimestamp = System.currentTimeMillis() + invisibleDuration; PopResult popResult = logicQueue.popNormal(CONSUMER_GROUP_ID, Filter.DEFAULT_FILTER, 1, invisibleDuration).join(); assertEquals(1, popResult.messageList().size()); // check ck exist @@ -118,32 +119,28 @@ void tryRevive_normal() throws StoreException { timerService.dequeue(); ckValue = kvService.get(KV_NAMESPACE_CHECK_POINT, SerializeUtil.buildCheckPointKey(TOPIC_ID, QUEUE_ID, handle.consumerGroupId(), handle.operationId())); assertNotNull(ckValue); - // after 1s revive can clear ck - long reviveTimestamp = System.currentTimeMillis() + invisibleDuration; + // Wait clearing ck. await().until(() -> { timerService.dequeue(); - return reviveService.reviveTimestamp() >= reviveTimestamp && reviveService.inflightReviveCount() == 0; + byte[] ck = kvService.get(KV_NAMESPACE_CHECK_POINT, SerializeUtil.buildCheckPointKey(TOPIC_ID, QUEUE_ID, handle.consumerGroupId(), handle.operationId())); + return ck == null; }); - ckValue = kvService.get(KV_NAMESPACE_CHECK_POINT, SerializeUtil.buildCheckPointKey(TOPIC_ID, QUEUE_ID, handle.consumerGroupId(), handle.operationId())); - assertNull(ckValue); - // check if this message has been appended to retry stream PullResult retryPullResult = logicQueue.pullRetry(CONSUMER_GROUP_ID, Filter.DEFAULT_FILTER, 0, invisibleDuration).join(); assertEquals(1, retryPullResult.messageList().size()); // pop retry + long reviveTimestamp1 = System.currentTimeMillis() + invisibleDuration; PopResult retryPopResult = logicQueue.popRetry(CONSUMER_GROUP_ID, Filter.DEFAULT_FILTER, 1, invisibleDuration).join(); assertEquals(1, retryPopResult.messageList().size()); FlatMessageExt msg = retryPopResult.messageList().get(0); assertEquals(2, msg.deliveryAttempts()); - long reviveTimestamp1 = System.currentTimeMillis() + invisibleDuration; // after 1s await().until(() -> { timerService.dequeue(); - long ts0 = reviveService.reviveTimestamp(); - return ts0 >= reviveTimestamp1; + return reviveService.reviveTimestamp() >= reviveTimestamp1; }); // wait inflight all complete await().until(() -> reviveService.inflightReviveCount() == 0); @@ -173,7 +170,8 @@ void tryRevive_fifo() throws StoreException { logicQueue.put(message).join(); } // pop message - int invisibleDuration = 1000; + int invisibleDuration = 100; + long reviveTimestamp = System.currentTimeMillis() + invisibleDuration; PopResult popResult = logicQueue.popFifo(CONSUMER_GROUP_ID, Filter.DEFAULT_FILTER, 1, invisibleDuration).join(); assertEquals(1, popResult.messageList().size()); // check ck exist @@ -185,33 +183,27 @@ void tryRevive_fifo() throws StoreException { ckValue = kvService.get(KV_NAMESPACE_CHECK_POINT, SerializeUtil.buildCheckPointKey(TOPIC_ID, QUEUE_ID, handle.consumerGroupId(), handle.operationId())); assertNotNull(ckValue); // after 1s revive can clear ck - long reviveTimestamp = System.currentTimeMillis() + invisibleDuration; await().until(() -> { timerService.dequeue(); - return reviveService.reviveTimestamp() >= reviveTimestamp && reviveService.inflightReviveCount() == 0; + byte[] ck = kvService.get(KV_NAMESPACE_CHECK_POINT, SerializeUtil.buildCheckPointKey(TOPIC_ID, QUEUE_ID, handle.consumerGroupId(), handle.operationId())); + return ck == null; }); - ckValue = kvService.get(KV_NAMESPACE_CHECK_POINT, SerializeUtil.buildCheckPointKey(TOPIC_ID, QUEUE_ID, handle.consumerGroupId(), handle.operationId())); - assertNull(ckValue); - // pop again + long reviveTimestamp1 = System.currentTimeMillis() + invisibleDuration; PopResult retryPopResult = logicQueue.popFifo(CONSUMER_GROUP_ID, Filter.DEFAULT_FILTER, 1, invisibleDuration).join(); assertEquals(1, retryPopResult.messageList().size()); FlatMessageExt msg = retryPopResult.messageList().get(0); assertEquals(2, msg.deliveryAttempts()); assertEquals(0, msg.offset()); - long reviveTimestamp1 = System.currentTimeMillis() + invisibleDuration; // after 1s await().until(() -> { timerService.dequeue(); - long ts0 = reviveService.reviveTimestamp(); - return ts0 >= reviveTimestamp1; + return reviveService.reviveTimestamp() >= reviveTimestamp1; }); // wait inflight all complete - await().until(() -> { - return reviveService.inflightReviveCount() == 0; - }); + await().until(() -> reviveService.inflightReviveCount() == 0); // check if this message has been sent to DLQ Mockito.verify(dlqSender, Mockito.times(1)) @@ -243,7 +235,7 @@ void tryRevive_dead_letter() throws Exception { FlatMessage message = FlatMessage.getRootAsFlatMessage(buildMessage(TOPIC_ID, QUEUE_ID, "TagA")); logicQueue.put(message).join(); // pop message - int invisibleDuration = 1000; + int invisibleDuration = 100; long reviveTimestamp = System.currentTimeMillis() + invisibleDuration; PopResult popResult = logicQueue.popNormal(CONSUMER_GROUP_ID, Filter.DEFAULT_FILTER, 1, invisibleDuration).join(); assertEquals(1, popResult.messageList().size()); @@ -255,15 +247,15 @@ void tryRevive_dead_letter() throws Exception { timerService.dequeue(); ckValue = kvService.get(KV_NAMESPACE_CHECK_POINT, SerializeUtil.buildCheckPointKey(TOPIC_ID, QUEUE_ID, handle.consumerGroupId(), handle.operationId())); assertNotNull(ckValue); - // after 1s revive can clear ck + + // Wait clearing ck. + ReceiptHandle finalHandle = handle; await().until(() -> { timerService.dequeue(); - return reviveService.reviveTimestamp() >= reviveTimestamp && reviveService.inflightReviveCount() == 0; + byte[] ck = kvService.get(KV_NAMESPACE_CHECK_POINT, SerializeUtil.buildCheckPointKey(TOPIC_ID, QUEUE_ID, finalHandle.consumerGroupId(), finalHandle.operationId())); + return ck == null; }); - ckValue = kvService.get(KV_NAMESPACE_CHECK_POINT, SerializeUtil.buildCheckPointKey(TOPIC_ID, QUEUE_ID, handle.consumerGroupId(), handle.operationId())); - assertNull(ckValue); - // check if this message has been appended to retry stream PullResult retryPullResult = logicQueue.pullRetry(CONSUMER_GROUP_ID, Filter.DEFAULT_FILTER, 0, 32).join(); assertEquals(1, retryPullResult.messageList().size());