Skip to content

Commit

Permalink
refactor(proxy): optimize test performance and fix flaky test
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored and daniel-y committed Oct 30, 2023
1 parent cc2c072 commit 1f7e671
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ public void run() {
}

protected void tryRevive(TimerTag timerTag) {
this.reviveTimestamp = System.currentTimeMillis();
backgroundExecutor.execute(new ReviveTask(timerTag.payloadAsByteBuffer()));
reviveTimestamp = timerTag.deliveryTimestamp();
}

public long reviveTimestamp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -195,7 +196,7 @@ public void pop_order() {
}
List<String> receiptHandles = new ArrayList<>();
// 2. pop 3 message
int invisibleDuration = 800;
int invisibleDuration = 100;
PopResult popResult = messageStore.pop(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID, Filter.DEFAULT_FILTER, 3, true, false, invisibleDuration).join();
assertEquals(3, popResult.messageList().size());
for (FlatMessageExt message : popResult.messageList()) {
Expand Down Expand Up @@ -223,6 +224,7 @@ public void pop_order() {
messageStore.ack(receiptHandles.get(2)).join();

// 9. pop again
long reviveTimestamp = System.currentTimeMillis() + invisibleDuration;
popResult = messageStore.pop(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID, Filter.DEFAULT_FILTER, 3, true, false, invisibleDuration).join();
assertEquals(PopResult.Status.FOUND, popResult.status());
assertEquals(2, popResult.messageList().size());
Expand All @@ -237,8 +239,8 @@ public void pop_order() {
assertEquals(PopResult.Status.LOCKED, popResult.status());

// 11. after 1100ms, pop again
long reviveTimestamp = System.currentTimeMillis() + invisibleDuration;
await().until(() -> reviveService.reviveTimestamp() > reviveTimestamp);
await().atMost(2, TimeUnit.SECONDS)
.until(() -> reviveService.reviveTimestamp() >= reviveTimestamp && reviveService.inflightReviveCount() == 0);

popResult = messageStore.pop(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID, Filter.DEFAULT_FILTER, 3, true, false, invisibleDuration).join();
assertEquals(PopResult.Status.FOUND, popResult.status());
Expand All @@ -260,7 +262,7 @@ public void pop_snapshot() {
}
List<String> receiptHandles = new ArrayList<>();
// 2. pop 3 message
int invisibleDuration = 800;
int invisibleDuration = 100;
PopResult popResult = messageStore.pop(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID, Filter.DEFAULT_FILTER, 3, false, false, invisibleDuration).join();
assertEquals(3, popResult.messageList().size());
for (FlatMessageExt message : popResult.messageList()) {
Expand All @@ -272,6 +274,7 @@ public void pop_snapshot() {
messageStore.ack(receiptHandles.get(2)).join();

// 4. pop 3 message
long reviveTimestamp = System.currentTimeMillis() + invisibleDuration;
popResult = messageStore.pop(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID, Filter.DEFAULT_FILTER, 3, false, false, invisibleDuration).join();
assertEquals(2, popResult.messageList().size());
for (FlatMessageExt message : popResult.messageList()) {
Expand All @@ -282,7 +285,6 @@ public void pop_snapshot() {
popResult = messageStore.pop(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID, Filter.DEFAULT_FILTER, 3, false, false, invisibleDuration).join();
assertEquals(PopResult.Status.END_OF_QUEUE, popResult.status());

long reviveTimestamp = System.currentTimeMillis() + invisibleDuration;
popResult = messageStore.pop(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID, Filter.DEFAULT_FILTER, 3, false, true, invisibleDuration).join();
assertEquals(PopResult.Status.END_OF_QUEUE, popResult.status());

Expand All @@ -295,7 +297,7 @@ public void pop_snapshot() {
assertEquals(1, streamStore.nextOffset(snapshotStream.getStreamId()));

// 6. after 1100ms, pop again
await().until(() -> reviveService.reviveTimestamp() > reviveTimestamp);
await().until(() -> reviveService.reviveTimestamp() >= reviveTimestamp);

popResult = messageStore.pop(CONSUMER_GROUP_ID, TOPIC_ID, QUEUE_ID, Filter.DEFAULT_FILTER, 3, false, false, invisibleDuration).join();
assertEquals(PopResult.Status.END_OF_QUEUE, popResult.status());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ void tryRevive_normal() throws StoreException {
FlatMessage message = FlatMessage.getRootAsFlatMessage(buildMessage(TOPIC_ID, QUEUE_ID, "TagA"));
logicQueue.put(message).join();
// pop message
int invisibleDuration = 1000;
int invisibleDuration = 100;
long reviveTimestamp = System.currentTimeMillis() + invisibleDuration;
PopResult popResult = logicQueue.popNormal(CONSUMER_GROUP_ID, Filter.DEFAULT_FILTER, 1, invisibleDuration).join();
assertEquals(1, popResult.messageList().size());
// check ck exist
Expand All @@ -118,32 +119,28 @@ void tryRevive_normal() throws StoreException {
timerService.dequeue();
ckValue = kvService.get(KV_NAMESPACE_CHECK_POINT, SerializeUtil.buildCheckPointKey(TOPIC_ID, QUEUE_ID, handle.consumerGroupId(), handle.operationId()));
assertNotNull(ckValue);
// after 1s revive can clear ck
long reviveTimestamp = System.currentTimeMillis() + invisibleDuration;
// Wait clearing ck.
await().until(() -> {
timerService.dequeue();
return reviveService.reviveTimestamp() >= reviveTimestamp && reviveService.inflightReviveCount() == 0;
byte[] ck = kvService.get(KV_NAMESPACE_CHECK_POINT, SerializeUtil.buildCheckPointKey(TOPIC_ID, QUEUE_ID, handle.consumerGroupId(), handle.operationId()));
return ck == null;
});

ckValue = kvService.get(KV_NAMESPACE_CHECK_POINT, SerializeUtil.buildCheckPointKey(TOPIC_ID, QUEUE_ID, handle.consumerGroupId(), handle.operationId()));
assertNull(ckValue);

// check if this message has been appended to retry stream
PullResult retryPullResult = logicQueue.pullRetry(CONSUMER_GROUP_ID, Filter.DEFAULT_FILTER, 0, invisibleDuration).join();
assertEquals(1, retryPullResult.messageList().size());

// pop retry
long reviveTimestamp1 = System.currentTimeMillis() + invisibleDuration;
PopResult retryPopResult = logicQueue.popRetry(CONSUMER_GROUP_ID, Filter.DEFAULT_FILTER, 1, invisibleDuration).join();
assertEquals(1, retryPopResult.messageList().size());
FlatMessageExt msg = retryPopResult.messageList().get(0);
assertEquals(2, msg.deliveryAttempts());

long reviveTimestamp1 = System.currentTimeMillis() + invisibleDuration;
// after 1s
await().until(() -> {
timerService.dequeue();
long ts0 = reviveService.reviveTimestamp();
return ts0 >= reviveTimestamp1;
return reviveService.reviveTimestamp() >= reviveTimestamp1;
});
// wait inflight all complete
await().until(() -> reviveService.inflightReviveCount() == 0);
Expand Down Expand Up @@ -173,7 +170,8 @@ void tryRevive_fifo() throws StoreException {
logicQueue.put(message).join();
}
// pop message
int invisibleDuration = 1000;
int invisibleDuration = 100;
long reviveTimestamp = System.currentTimeMillis() + invisibleDuration;
PopResult popResult = logicQueue.popFifo(CONSUMER_GROUP_ID, Filter.DEFAULT_FILTER, 1, invisibleDuration).join();
assertEquals(1, popResult.messageList().size());
// check ck exist
Expand All @@ -185,33 +183,27 @@ void tryRevive_fifo() throws StoreException {
ckValue = kvService.get(KV_NAMESPACE_CHECK_POINT, SerializeUtil.buildCheckPointKey(TOPIC_ID, QUEUE_ID, handle.consumerGroupId(), handle.operationId()));
assertNotNull(ckValue);
// after 1s revive can clear ck
long reviveTimestamp = System.currentTimeMillis() + invisibleDuration;
await().until(() -> {
timerService.dequeue();
return reviveService.reviveTimestamp() >= reviveTimestamp && reviveService.inflightReviveCount() == 0;
byte[] ck = kvService.get(KV_NAMESPACE_CHECK_POINT, SerializeUtil.buildCheckPointKey(TOPIC_ID, QUEUE_ID, handle.consumerGroupId(), handle.operationId()));
return ck == null;
});

ckValue = kvService.get(KV_NAMESPACE_CHECK_POINT, SerializeUtil.buildCheckPointKey(TOPIC_ID, QUEUE_ID, handle.consumerGroupId(), handle.operationId()));
assertNull(ckValue);

// pop again
long reviveTimestamp1 = System.currentTimeMillis() + invisibleDuration;
PopResult retryPopResult = logicQueue.popFifo(CONSUMER_GROUP_ID, Filter.DEFAULT_FILTER, 1, invisibleDuration).join();
assertEquals(1, retryPopResult.messageList().size());
FlatMessageExt msg = retryPopResult.messageList().get(0);
assertEquals(2, msg.deliveryAttempts());
assertEquals(0, msg.offset());

long reviveTimestamp1 = System.currentTimeMillis() + invisibleDuration;
// after 1s
await().until(() -> {
timerService.dequeue();
long ts0 = reviveService.reviveTimestamp();
return ts0 >= reviveTimestamp1;
return reviveService.reviveTimestamp() >= reviveTimestamp1;
});
// wait inflight all complete
await().until(() -> {
return reviveService.inflightReviveCount() == 0;
});
await().until(() -> reviveService.inflightReviveCount() == 0);

// check if this message has been sent to DLQ
Mockito.verify(dlqSender, Mockito.times(1))
Expand Down Expand Up @@ -243,7 +235,7 @@ void tryRevive_dead_letter() throws Exception {
FlatMessage message = FlatMessage.getRootAsFlatMessage(buildMessage(TOPIC_ID, QUEUE_ID, "TagA"));
logicQueue.put(message).join();
// pop message
int invisibleDuration = 1000;
int invisibleDuration = 100;
long reviveTimestamp = System.currentTimeMillis() + invisibleDuration;
PopResult popResult = logicQueue.popNormal(CONSUMER_GROUP_ID, Filter.DEFAULT_FILTER, 1, invisibleDuration).join();
assertEquals(1, popResult.messageList().size());
Expand All @@ -255,15 +247,15 @@ void tryRevive_dead_letter() throws Exception {
timerService.dequeue();
ckValue = kvService.get(KV_NAMESPACE_CHECK_POINT, SerializeUtil.buildCheckPointKey(TOPIC_ID, QUEUE_ID, handle.consumerGroupId(), handle.operationId()));
assertNotNull(ckValue);
// after 1s revive can clear ck

// Wait clearing ck.
ReceiptHandle finalHandle = handle;
await().until(() -> {
timerService.dequeue();
return reviveService.reviveTimestamp() >= reviveTimestamp && reviveService.inflightReviveCount() == 0;
byte[] ck = kvService.get(KV_NAMESPACE_CHECK_POINT, SerializeUtil.buildCheckPointKey(TOPIC_ID, QUEUE_ID, finalHandle.consumerGroupId(), finalHandle.operationId()));
return ck == null;
});

ckValue = kvService.get(KV_NAMESPACE_CHECK_POINT, SerializeUtil.buildCheckPointKey(TOPIC_ID, QUEUE_ID, handle.consumerGroupId(), handle.operationId()));
assertNull(ckValue);

// check if this message has been appended to retry stream
PullResult retryPullResult = logicQueue.pullRetry(CONSUMER_GROUP_ID, Filter.DEFAULT_FILTER, 0, 32).join();
assertEquals(1, retryPullResult.messageList().size());
Expand Down

0 comments on commit 1f7e671

Please sign in to comment.