From 4fb98ce35ad7ddc76e5a7beb2fcaaf61cf11f89b Mon Sep 17 00:00:00 2001 From: SSpirits Date: Mon, 6 Nov 2023 11:08:58 +0800 Subject: [PATCH] fix(store): fix bug in decoding reset consume offset operation (#572) Signed-off-by: SSpirits --- .../operation/ResetConsumeOffsetOperation.java | 15 +++++++++++++++ .../automq/rocketmq/store/util/SerializeUtil.java | 4 ++-- .../rocketmq/store/util/SerializeUtilTest.java | 11 +++++++++++ 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/store/src/main/java/com/automq/rocketmq/store/model/operation/ResetConsumeOffsetOperation.java b/store/src/main/java/com/automq/rocketmq/store/model/operation/ResetConsumeOffsetOperation.java index 4012f8ca5..405e40d14 100644 --- a/store/src/main/java/com/automq/rocketmq/store/model/operation/ResetConsumeOffsetOperation.java +++ b/store/src/main/java/com/automq/rocketmq/store/model/operation/ResetConsumeOffsetOperation.java @@ -18,6 +18,7 @@ package com.automq.rocketmq.store.model.operation; import com.automq.rocketmq.store.api.MessageStateMachine; +import java.util.Objects; public class ResetConsumeOffsetOperation extends Operation { @@ -50,4 +51,18 @@ public long offset() { return offset; } + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + ResetConsumeOffsetOperation operation = (ResetConsumeOffsetOperation) o; + return consumerGroupId == operation.consumerGroupId && topicId == operation.topicId && queueId == operation.queueId && offset == operation.offset() && operationTimestamp == operation.operationTimestamp; + } + + @Override + public int hashCode() { + return Objects.hash(consumerGroupId, topicId, queueId, offset, operationTimestamp); + } } diff --git a/store/src/main/java/com/automq/rocketmq/store/util/SerializeUtil.java b/store/src/main/java/com/automq/rocketmq/store/util/SerializeUtil.java index 13e99fb6b..84ca8bd12 100644 --- a/store/src/main/java/com/automq/rocketmq/store/util/SerializeUtil.java +++ b/store/src/main/java/com/automq/rocketmq/store/util/SerializeUtil.java @@ -164,8 +164,8 @@ public static Operation decodeOperation(ByteBuffer buffer, case com.automq.rocketmq.store.model.generated.Operation.ResetConsumeOffsetOperation -> { com.automq.rocketmq.store.model.generated.ResetConsumeOffsetOperation resetConsumeOffsetOperation = (com.automq.rocketmq.store.model.generated.ResetConsumeOffsetOperation) operationLogItem.operation(new com.automq.rocketmq.store.model.generated.ResetConsumeOffsetOperation()); return new ResetConsumeOffsetOperation(resetConsumeOffsetOperation.topicId(), resetConsumeOffsetOperation.queueId(), - operationStreamId, snapshotStreamId, stateMachine, - resetConsumeOffsetOperation.operationTimestamp(), resetConsumeOffsetOperation.consumerGroupId(), resetConsumeOffsetOperation.offset()); + operationStreamId, snapshotStreamId, stateMachine, resetConsumeOffsetOperation.consumerGroupId(), resetConsumeOffsetOperation.offset(), + resetConsumeOffsetOperation.operationTimestamp()); } default -> throw new IllegalStateException("Unexpected operation type: " + operationLogItem.operationType()); diff --git a/store/src/test/java/com/automq/rocketmq/store/util/SerializeUtilTest.java b/store/src/test/java/com/automq/rocketmq/store/util/SerializeUtilTest.java index 425654c62..86a45f0ef 100644 --- a/store/src/test/java/com/automq/rocketmq/store/util/SerializeUtilTest.java +++ b/store/src/test/java/com/automq/rocketmq/store/util/SerializeUtilTest.java @@ -23,6 +23,7 @@ import com.automq.rocketmq.store.model.operation.ChangeInvisibleDurationOperation; import com.automq.rocketmq.store.model.operation.OperationSnapshot; import com.automq.rocketmq.store.model.operation.PopOperation.PopOperationType; +import com.automq.rocketmq.store.model.operation.ResetConsumeOffsetOperation; import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.ConcurrentSkipListMap; @@ -142,6 +143,16 @@ void encodeChangeInvisibleDurationOperation() { assertEquals(changeInvisibleDurationOperation, decodedOperation); } + @Test + void encodeResetConsumerOffset() { + ResetConsumeOffsetOperation resetConsumeOffsetOperation = new ResetConsumeOffsetOperation( + TOPIC_ID, QUEUE_ID, OPERATION_STREAM_ID, SNAPSHOT_STREAM_ID, null, CONSUMER_GROUP_ID, 0, OPERATION_TIMESTAMP + ); + byte[] bytes = SerializeUtil.encodeResetConsumeOffsetOperation(resetConsumeOffsetOperation); + com.automq.rocketmq.store.model.operation.Operation decodedOperation = SerializeUtil.decodeOperation(ByteBuffer.wrap(bytes), null, OPERATION_STREAM_ID, SNAPSHOT_STREAM_ID); + assertEquals(resetConsumeOffsetOperation, decodedOperation); + } + @Test void encodeOperationSnapshot() { ConcurrentSkipListMap consumeTimes = new ConcurrentSkipListMap<>();