Skip to content

Commit

Permalink
fix(store): fix bug in decoding reset consume offset operation (#572)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Nov 6, 2023
1 parent f738261 commit 4fb98ce
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.automq.rocketmq.store.model.operation;

import com.automq.rocketmq.store.api.MessageStateMachine;
import java.util.Objects;

public class ResetConsumeOffsetOperation extends Operation {

Expand Down Expand Up @@ -50,4 +51,18 @@ public long offset() {
return offset;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
ResetConsumeOffsetOperation operation = (ResetConsumeOffsetOperation) o;
return consumerGroupId == operation.consumerGroupId && topicId == operation.topicId && queueId == operation.queueId && offset == operation.offset() && operationTimestamp == operation.operationTimestamp;
}

@Override
public int hashCode() {
return Objects.hash(consumerGroupId, topicId, queueId, offset, operationTimestamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ public static Operation decodeOperation(ByteBuffer buffer,
case com.automq.rocketmq.store.model.generated.Operation.ResetConsumeOffsetOperation -> {
com.automq.rocketmq.store.model.generated.ResetConsumeOffsetOperation resetConsumeOffsetOperation = (com.automq.rocketmq.store.model.generated.ResetConsumeOffsetOperation) operationLogItem.operation(new com.automq.rocketmq.store.model.generated.ResetConsumeOffsetOperation());
return new ResetConsumeOffsetOperation(resetConsumeOffsetOperation.topicId(), resetConsumeOffsetOperation.queueId(),
operationStreamId, snapshotStreamId, stateMachine,
resetConsumeOffsetOperation.operationTimestamp(), resetConsumeOffsetOperation.consumerGroupId(), resetConsumeOffsetOperation.offset());
operationStreamId, snapshotStreamId, stateMachine, resetConsumeOffsetOperation.consumerGroupId(), resetConsumeOffsetOperation.offset(),
resetConsumeOffsetOperation.operationTimestamp());
}
default ->
throw new IllegalStateException("Unexpected operation type: " + operationLogItem.operationType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.automq.rocketmq.store.model.operation.ChangeInvisibleDurationOperation;
import com.automq.rocketmq.store.model.operation.OperationSnapshot;
import com.automq.rocketmq.store.model.operation.PopOperation.PopOperationType;
import com.automq.rocketmq.store.model.operation.ResetConsumeOffsetOperation;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ConcurrentSkipListMap;
Expand Down Expand Up @@ -142,6 +143,16 @@ void encodeChangeInvisibleDurationOperation() {
assertEquals(changeInvisibleDurationOperation, decodedOperation);
}

@Test
void encodeResetConsumerOffset() {
ResetConsumeOffsetOperation resetConsumeOffsetOperation = new ResetConsumeOffsetOperation(
TOPIC_ID, QUEUE_ID, OPERATION_STREAM_ID, SNAPSHOT_STREAM_ID, null, CONSUMER_GROUP_ID, 0, OPERATION_TIMESTAMP
);
byte[] bytes = SerializeUtil.encodeResetConsumeOffsetOperation(resetConsumeOffsetOperation);
com.automq.rocketmq.store.model.operation.Operation decodedOperation = SerializeUtil.decodeOperation(ByteBuffer.wrap(bytes), null, OPERATION_STREAM_ID, SNAPSHOT_STREAM_ID);
assertEquals(resetConsumeOffsetOperation, decodedOperation);
}

@Test
void encodeOperationSnapshot() {
ConcurrentSkipListMap<Long, Integer> consumeTimes = new ConcurrentSkipListMap<>();
Expand Down

0 comments on commit 4fb98ce

Please sign in to comment.