From 80417193a448e40163cef83b3e4b5719a1b5f609 Mon Sep 17 00:00:00 2001 From: SSpirits Date: Thu, 21 Dec 2023 20:00:09 +0800 Subject: [PATCH] refactor(store): refactor MessageStateMachine Signed-off-by: SSpirits --- store/src/main/fbs/store.fbs | 3 - .../store/api/MessageStateMachine.java | 2 - .../model/metadata/ConsumerGroupMetadata.java | 35 +-- .../model/operation/OperationSnapshot.java | 35 +-- .../queue/DefaultLogicQueueStateMachine.java | 288 +++++++----------- .../store/queue/StreamLogicQueue.java | 8 +- .../store/service/RocksDBKVService.java | 24 ++ .../rocketmq/store/service/api/KVService.java | 2 + .../rocketmq/store/util/SerializeUtil.java | 40 +-- .../automq/rocketmq/store/LogicQueueTest.java | 2 +- .../metadata/ConsumerGroupMetadataTest.java | 21 +- .../store/queue/AckCommitterTest.java | 47 --- .../store/service/ReviveServiceTest.java | 5 +- .../store/util/SerializeUtilTest.java | 40 +-- 14 files changed, 187 insertions(+), 365 deletions(-) delete mode 100644 store/src/test/java/com/automq/rocketmq/store/queue/AckCommitterTest.java diff --git a/store/src/main/fbs/store.fbs b/store/src/main/fbs/store.fbs index 7084bef26..a01924ae2 100644 --- a/store/src/main/fbs/store.fbs +++ b/store/src/main/fbs/store.fbs @@ -78,11 +78,8 @@ table ConsumerGroupMetadata { consumer_group_id:long; consume_offset:long; ack_offset:long; - ack_bit_map:[byte]; retry_consume_offset:long; retry_ack_offset:long; - retry_ack_bit_map:[byte]; - consume_times:[ConsumeTimes]; // for identify the consumer group metadata version, usually it is the first operation id when the consumer group metadata is created or reset version: long; } diff --git a/store/src/main/java/com/automq/rocketmq/store/api/MessageStateMachine.java b/store/src/main/java/com/automq/rocketmq/store/api/MessageStateMachine.java index 8e404581c..e8582cf46 100644 --- a/store/src/main/java/com/automq/rocketmq/store/api/MessageStateMachine.java +++ b/store/src/main/java/com/automq/rocketmq/store/api/MessageStateMachine.java @@ -59,8 +59,6 @@ public interface MessageStateMachine { int consumeTimes(long consumerGroupId, long offset); - void registerAckOffsetListener(OffsetListener listener); - void registerRetryAckOffsetListener(OffsetListener listener); class ReplayPopResult { diff --git a/store/src/main/java/com/automq/rocketmq/store/model/metadata/ConsumerGroupMetadata.java b/store/src/main/java/com/automq/rocketmq/store/model/metadata/ConsumerGroupMetadata.java index e21050b6e..1399e0ffa 100644 --- a/store/src/main/java/com/automq/rocketmq/store/model/metadata/ConsumerGroupMetadata.java +++ b/store/src/main/java/com/automq/rocketmq/store/model/metadata/ConsumerGroupMetadata.java @@ -18,7 +18,6 @@ package com.automq.rocketmq.store.model.metadata; import java.util.Objects; -import java.util.concurrent.ConcurrentSkipListMap; public class ConsumerGroupMetadata { private final long consumerGroupId; @@ -26,24 +25,20 @@ public class ConsumerGroupMetadata { private long ackOffset; private long retryConsumeOffset; private long retryAckOffset; - private final ConcurrentSkipListMap consumeTimes; private final long version; public ConsumerGroupMetadata(long consumerGroupId) { this.consumerGroupId = consumerGroupId; - this.consumeTimes = new ConcurrentSkipListMap<>(); this.version = 0; } public ConsumerGroupMetadata(long consumerGroupId, long consumeOffset, long ackOffset, long retryConsumeOffset, - long retryAckOffset, ConcurrentSkipListMap consumeTimes, - long version) { + long retryAckOffset, long version) { this.consumerGroupId = consumerGroupId; this.consumeOffset = consumeOffset; this.ackOffset = ackOffset; this.retryConsumeOffset = retryConsumeOffset; this.retryAckOffset = retryAckOffset; - this.consumeTimes = consumeTimes; this.version = version; } @@ -71,22 +66,22 @@ public void setConsumeOffset(long consumeOffset) { this.consumeOffset = consumeOffset; } - public void setAckOffset(long ackOffset) { - this.ackOffset = ackOffset; - // when ack offset is updated, we should clear the consume times - this.consumeTimes.subMap(0L, ackOffset).clear(); + public void advanceAckOffset(long ackOffset) { + if (ackOffset > this.ackOffset) { + this.ackOffset = ackOffset; + } } - public void setRetryConsumeOffset(long retryConsumeOffset) { - this.retryConsumeOffset = retryConsumeOffset; - } - - public void setRetryAckOffset(long retryAckOffset) { - this.retryAckOffset = retryAckOffset; + public void advanceRetryConsumeOffset(long retryConsumeOffset) { + if (retryConsumeOffset > this.retryConsumeOffset) { + this.retryConsumeOffset = retryConsumeOffset; + } } - public ConcurrentSkipListMap getConsumeTimes() { - return consumeTimes; + public void advanceRetryAckOffset(long retryAckOffset) { + if (retryAckOffset > this.retryAckOffset) { + this.retryAckOffset = retryAckOffset; + } } public long getVersion() { @@ -100,11 +95,11 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; ConsumerGroupMetadata metadata = (ConsumerGroupMetadata) o; - return consumerGroupId == metadata.consumerGroupId && consumeOffset == metadata.consumeOffset && ackOffset == metadata.ackOffset && retryConsumeOffset == metadata.retryConsumeOffset && retryAckOffset == metadata.retryAckOffset && version == metadata.version && Objects.equals(consumeTimes, metadata.consumeTimes); + return consumerGroupId == metadata.consumerGroupId && consumeOffset == metadata.consumeOffset && ackOffset == metadata.ackOffset && retryConsumeOffset == metadata.retryConsumeOffset && retryAckOffset == metadata.retryAckOffset && version == metadata.version; } @Override public int hashCode() { - return Objects.hash(consumerGroupId, consumeOffset, ackOffset, retryConsumeOffset, retryAckOffset, consumeTimes, version); + return Objects.hash(consumerGroupId, consumeOffset, ackOffset, retryConsumeOffset, retryAckOffset, version); } } diff --git a/store/src/main/java/com/automq/rocketmq/store/model/operation/OperationSnapshot.java b/store/src/main/java/com/automq/rocketmq/store/model/operation/OperationSnapshot.java index 18099a138..f60c6a637 100644 --- a/store/src/main/java/com/automq/rocketmq/store/model/operation/OperationSnapshot.java +++ b/store/src/main/java/com/automq/rocketmq/store/model/operation/OperationSnapshot.java @@ -21,22 +21,23 @@ import com.automq.rocketmq.store.model.metadata.ConsumerGroupMetadata; import java.util.List; import java.util.Objects; -import java.util.concurrent.ConcurrentSkipListMap; public class OperationSnapshot { private final long snapshotEndOffset; private List checkPoints; private long kvServiceSnapshotVersion; - private final List consumerGroupMetadataList; + private final List consumerGroupMetadataList; - public OperationSnapshot(long snapshotEndOffset, long kvServiceSnapshotVersion, List consumerGroupMetadataList) { + public OperationSnapshot(long snapshotEndOffset, long kvServiceSnapshotVersion, + List consumerGroupMetadataList) { this.snapshotEndOffset = snapshotEndOffset; this.kvServiceSnapshotVersion = kvServiceSnapshotVersion; this.consumerGroupMetadataList = consumerGroupMetadataList; } - public OperationSnapshot(long snapshotEndOffset, List consumerGroupMetadataList, List checkPoints) { + public OperationSnapshot(long snapshotEndOffset, List consumerGroupMetadataList, + List checkPoints) { this.snapshotEndOffset = snapshotEndOffset; this.consumerGroupMetadataList = consumerGroupMetadataList; this.checkPoints = checkPoints; @@ -58,7 +59,7 @@ public long getKvServiceSnapshotVersion() { return kvServiceSnapshotVersion; } - public List getConsumerGroupMetadataList() { + public List getConsumerGroupMetadataList() { return consumerGroupMetadataList; } @@ -84,28 +85,4 @@ public String toString() { '}'; } - public static class ConsumerGroupMetadataSnapshot extends ConsumerGroupMetadata { - private final byte[] ackOffsetBitmapBuffer; - private final byte[] retryAckOffsetBitmapBuffer; - - public ConsumerGroupMetadataSnapshot(long consumerGroupId, long consumeOffset, long ackOffset, - long retryConsumeOffset, long retryAckOffset, - byte[] ackOffsetBitmapBuffer, byte[] retryAckOffsetBitmapBuffer, - ConcurrentSkipListMap consumeTimes, - long version) { - super(consumerGroupId, consumeOffset, ackOffset, retryConsumeOffset, retryAckOffset, consumeTimes, version); - this.ackOffsetBitmapBuffer = ackOffsetBitmapBuffer; - this.retryAckOffsetBitmapBuffer = retryAckOffsetBitmapBuffer; - } - - public byte[] getAckOffsetBitmapBuffer() { - return ackOffsetBitmapBuffer; - } - - public byte[] getRetryAckOffsetBitmapBuffer() { - return retryAckOffsetBitmapBuffer; - } - - } - } diff --git a/store/src/main/java/com/automq/rocketmq/store/queue/DefaultLogicQueueStateMachine.java b/store/src/main/java/com/automq/rocketmq/store/queue/DefaultLogicQueueStateMachine.java index 30b6d81bc..8254d0268 100644 --- a/store/src/main/java/com/automq/rocketmq/store/queue/DefaultLogicQueueStateMachine.java +++ b/store/src/main/java/com/automq/rocketmq/store/queue/DefaultLogicQueueStateMachine.java @@ -42,14 +42,10 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Consumer; import java.util.stream.Collectors; -import org.roaringbitmap.RoaringBitmap; -import org.roaringbitmap.buffer.ImmutableRoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,8 +63,6 @@ public class DefaultLogicQueueStateMachine implements MessageStateMachine { private final long topicId; private final int queueId; private ConcurrentMap consumerGroupMetadataMap; - private final ConcurrentMap ackCommitterMap = new ConcurrentHashMap<>(); - private final ConcurrentMap retryAckCommitterMap = new ConcurrentHashMap<>(); private long currentOperationOffset = -1; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock reentrantLock = lock.readLock(); @@ -76,7 +70,6 @@ public class DefaultLogicQueueStateMachine implements MessageStateMachine { private final KVService kvService; private final TimerService timerService; private final String identity; - private final List ackOffsetListeners = new ArrayList<>(); private final List retryAckOffsetListeners = new ArrayList<>(); public DefaultLogicQueueStateMachine(long topicId, int queueId, KVService kvService, TimerService timerService) { @@ -98,16 +91,6 @@ public int queueId() { return queueId; } - @Override - public void registerAckOffsetListener(OffsetListener listener) { - this.ackOffsetListeners.add(listener); - } - - @Override - public void registerRetryAckOffsetListener(OffsetListener listener) { - this.retryAckOffsetListeners.add(listener); - } - @Override public ReplayPopResult replayPopOperation(long operationOffset, PopOperation operation) throws StoreException { reentrantLock.lock(); @@ -140,15 +123,10 @@ private ReplayPopResult replayPopNormalOperation(long operationOffset, // update consume offset, data or retry stream ConsumerGroupMetadata metadata = this.consumerGroupMetadataMap.computeIfAbsent(consumerGroupId, k -> new ConsumerGroupMetadata(consumerGroupId)); if (metadata.getConsumeOffset() < offset + 1) { + // if this is a pop-last operation, it only needs to update consume offset metadata.setConsumeOffset(offset + 1); } if (operation.isEndMark()) { - // if this is a pop-last operation, it only needs to update consume offset and advance ack offset - long baseOffset = offset - count + 1; - for (int i = 0; i < count; i++) { - long currOffset = baseOffset + i; - this.getAckCommitter(consumerGroupId).commitAck(currOffset); - } return ReplayPopResult.empty(); } @@ -166,12 +144,9 @@ nextVisibleTimestamp, buildReceiptHandleKey(topicId, queueId, operationId), TimerHandlerType.POP_REVIVE, buildReceiptHandle(consumerGroupId, topicId, queueId, operationId)); requestList.add(timerEnqueueRequest); - Integer currentConsumeTimes = metadata.getConsumeTimes().getOrDefault(offset, 0); - int newConsumeTimes = currentConsumeTimes + 1; - metadata.getConsumeTimes().put(offset, newConsumeTimes); - kvService.batch(requestList.toArray(new BatchRequest[0])); - return ReplayPopResult.of(newConsumeTimes); + // normal pop operation does not need to update consume times + return ReplayPopResult.of(1); } private ReplayPopResult replayPopRetryOperation(long operationOffset, @@ -190,18 +165,7 @@ private ReplayPopResult replayPopRetryOperation(long operationOffset, // update consume offset, data or retry stream ConsumerGroupMetadata metadata = this.consumerGroupMetadataMap.computeIfAbsent(consumerGroupId, k -> new ConsumerGroupMetadata(consumerGroupId)); - if (metadata.getRetryConsumeOffset() < offset + 1) { - metadata.setRetryConsumeOffset(offset + 1); - } - if (operation.isEndMark()) { - // if this is a pop-last operation, it only needs to update consume offset and advance ack offset - long baseOffset = offset - count + 1; - for (int i = 0; i < count; i++) { - long currOffset = baseOffset + i; - this.getAckCommitter(consumerGroupId).commitAck(currOffset); - } - return ReplayPopResult.empty(); - } + metadata.advanceRetryConsumeOffset(offset + 1); List requestList = new ArrayList<>(); // write a ck for this offset @@ -242,12 +206,7 @@ private ReplayPopResult replayPopFifoOperation(long operationOffset, metadata.setConsumeOffset(offset + 1); } if (operation.isEndMark()) { - // if this is a pop-last operation, it only needs to update consume offset and advance ack offset - long baseOffset = offset - count + 1; - for (int i = 0; i < count; i++) { - long currOffset = baseOffset + i; - this.getAckCommitter(consumerGroupId).commitAck(currOffset); - } + // if this is a pop-last operation, it only needs to update consume offset return ReplayPopResult.empty(); } @@ -269,46 +228,23 @@ nextVisibleTimestamp, buildReceiptHandleKey(topicId, queueId, operationId), long baseOffset = offset - count + 1; for (int i = 0; i < count; i++) { long currOffset = baseOffset + i; - BatchWriteRequest writeOrderIndexRequest = new BatchWriteRequest(KV_NAMESPACE_FIFO_INDEX, - buildOrderIndexKey(consumerGroupId, topicId, queueId, currOffset), buildOrderIndexValue(operationId)); + byte[] orderIndexKey = buildOrderIndexKey(consumerGroupId, topicId, queueId, currOffset); + int consumeTimes = 0; + try { + byte[] orderIndexValue = kvService.get(KV_NAMESPACE_FIFO_INDEX, orderIndexKey); + if (orderIndexValue != null) { + ByteBuffer wrappedBuffer = ByteBuffer.wrap(orderIndexValue); + consumeTimes = wrappedBuffer.getInt(8); + } + } catch (StoreException e) { + LOGGER.error("{}: get consume times from order index failed", identity, e); + } + BatchWriteRequest writeOrderIndexRequest = new BatchWriteRequest(KV_NAMESPACE_FIFO_INDEX, orderIndexKey, buildOrderIndexValue(operationId, consumeTimes + 1)); requestList.add(writeOrderIndexRequest); } - Integer currentConsumeTimes = metadata.getConsumeTimes().getOrDefault(offset, 0); - int newConsumeTimes = currentConsumeTimes + 1; - metadata.getConsumeTimes().put(offset, newConsumeTimes); - kvService.batch(requestList.toArray(new BatchRequest[0])); - return ReplayPopResult.of(newConsumeTimes); - } - - private AckCommitter getAckCommitter(long consumerGroupId) { - return getAckCommitter(consumerGroupId, null); - } - - private AckCommitter getAckCommitter(long consumerGroupId, ByteBuffer serializedBitmapBuffer) { - ConsumerGroupMetadata metadata = this.consumerGroupMetadataMap.computeIfAbsent(consumerGroupId, k -> new ConsumerGroupMetadata(consumerGroupId)); - return this.ackCommitterMap.computeIfAbsent(consumerGroupId, k -> - new AckCommitter( - metadata.getAckOffset(), - offset -> { - metadata.setAckOffset(offset); - this.ackOffsetListeners.forEach(listener -> listener.onOffset(consumerGroupId, offset)); - }, - serializedBitmapBuffer - )); - } - - private AckCommitter getRetryAckCommitter(long consumerGroupId) { - return getRetryAckCommitter(consumerGroupId, null); - } - - private AckCommitter getRetryAckCommitter(long consumerGroupId, ByteBuffer serializedBitmapBuffer) { - ConsumerGroupMetadata metadata = this.consumerGroupMetadataMap.computeIfAbsent(consumerGroupId, k -> new ConsumerGroupMetadata(consumerGroupId)); - return this.retryAckCommitterMap.computeIfAbsent(consumerGroupId, k -> new AckCommitter(metadata.getRetryAckOffset(), offset -> { - metadata.setRetryAckOffset(offset); - this.retryAckOffsetListeners.forEach(listener -> listener.onOffset(consumerGroupId, offset)); - }, serializedBitmapBuffer)); + return ReplayPopResult.of(consumeTimes(consumerGroupId, offset)); } @Override @@ -339,19 +275,32 @@ public void replayAckOperation(long operationOffset, AckOperation operation) thr throw new StoreException(StoreErrorCode.ILLEGAL_ARGUMENT, "Ack operation failed, check point not found"); } CheckPoint ck = CheckPoint.getRootAsCheckPoint(ByteBuffer.wrap(ckValue)); - int count = ck.count(); - long baseOffset = ck.messageOffset() - count + 1; - for (int i = 0; i < count; i++) { - long currOffset = baseOffset + i; - if (ck.popOperationType() == PopOperation.PopOperationType.POP_NORMAL.ordinal() || - (ck.popOperationType() == PopOperation.PopOperationType.POP_ORDER.ordinal() && type == AckOperation.AckOperationType.ACK_NORMAL)) { - this.getAckCommitter(consumerGroupId).commitAck(currOffset); - } - if (ck.popOperationType() == PopOperation.PopOperationType.POP_RETRY.ordinal()) { - this.getRetryAckCommitter(consumerGroupId).commitAck(currOffset); + deleteCheckPointAndRewriteOrderIndex(ck); + + // Update ack offset + long ackOffset; + if (ck.popOperationType() == PopOperation.PopOperationType.POP_RETRY.ordinal()) { + ackOffset = metadata.getRetryConsumeOffset(); + } else { + ackOffset = metadata.getConsumeOffset(); + } + + byte[] prefix = kvService.getByPrefix(KV_NAMESPACE_CHECK_POINT, SerializeUtil.buildCheckPointGroupPrefix(topicId, queueId, consumerGroupId)); + if (prefix != null) { + CheckPoint earliestCK = SerializeUtil.decodeCheckPoint(ByteBuffer.wrap(prefix)); + ackOffset = earliestCK.messageOffset(); + } + + if (ck.popOperationType() == PopOperation.PopOperationType.POP_NORMAL.ordinal() || + (ck.popOperationType() == PopOperation.PopOperationType.POP_ORDER.ordinal() && type == AckOperation.AckOperationType.ACK_NORMAL)) { + metadata.advanceAckOffset(ackOffset); + } + if (ck.popOperationType() == PopOperation.PopOperationType.POP_RETRY.ordinal()) { + metadata.advanceRetryAckOffset(ackOffset); + for (OffsetListener listener : retryAckOffsetListeners) { + listener.onOffset(consumerGroupId, ackOffset); } } - deleteCheckPointAndRelatedStates(ck); } finally { reentrantLock.unlock(); } @@ -427,13 +376,9 @@ public void replayResetConsumeOffsetOperation(long operationOffset, ResetConsume // Create a new consumer group with a new version. ConsumerGroupMetadata metadata = this.consumerGroupMetadataMap.computeIfAbsent(consumerGroupId, k -> new ConsumerGroupMetadata(consumerGroupId)); ConsumerGroupMetadata newMetadata = new ConsumerGroupMetadata( - metadata.getConsumerGroupId(), newConsumeOffset, newConsumeOffset, metadata.getRetryConsumeOffset(), metadata.getRetryAckOffset(), - new ConcurrentSkipListMap<>(), operationOffset); + metadata.getConsumerGroupId(), newConsumeOffset, newConsumeOffset, metadata.getRetryConsumeOffset(), metadata.getRetryAckOffset(), operationOffset); this.consumerGroupMetadataMap.put(consumerGroupId, newMetadata); - // Remove old ack committer to avoid advance the ack offset to old group-metadata. - this.ackCommitterMap.remove(consumerGroupId); - // Delete all check points and related states about this consumer group List checkPoints = new ArrayList<>(); byte[] prefix = SerializeUtil.buildCheckPointGroupPrefix(topicId, queueId, consumerGroupId); @@ -458,11 +403,38 @@ private void deleteCheckPointsAndRelatedStates(List checkPointList) } } - private void deleteCheckPointAndRelatedStates(CheckPoint checkPoint) throws StoreException { - List batchRequests = deleteCheckPointAndRelatedStatesReqs(checkPoint); - if (!batchRequests.isEmpty()) { - kvService.batch(batchRequests.toArray(new BatchRequest[0])); + private void deleteCheckPointAndRewriteOrderIndex(CheckPoint checkPoint) throws StoreException { + List requestList = new ArrayList<>(); + + BatchDeleteRequest deleteCheckPointRequest = new BatchDeleteRequest(KV_NAMESPACE_CHECK_POINT, + buildCheckPointKey(checkPoint.topicId(), checkPoint.queueId(), checkPoint.consumerGroupId(), checkPoint.operationId())); + requestList.add(deleteCheckPointRequest); + + List timerCancelRequest = timerService.cancelRequest(checkPoint.nextVisibleTimestamp(), + buildReceiptHandleKey(checkPoint.topicId(), checkPoint.queueId(), checkPoint.operationId())); + requestList.addAll(timerCancelRequest); + + if (checkPoint.popOperationType() == PopOperation.PopOperationType.POP_ORDER.value()) { + long baseOffset = checkPoint.messageOffset() - checkPoint.count() + 1; + for (int i = 0; i < checkPoint.count(); i++) { + long currOffset = baseOffset + i; + byte[] orderIndexKey = buildOrderIndexKey(checkPoint.consumerGroupId(), checkPoint.topicId(), checkPoint.queueId(), currOffset); + int consumeTimes = 0; + try { + byte[] orderIndexValue = kvService.get(KV_NAMESPACE_FIFO_INDEX, orderIndexKey); + if (orderIndexValue != null) { + ByteBuffer wrappedBuffer = ByteBuffer.wrap(orderIndexValue); + consumeTimes = wrappedBuffer.getInt(8); + } + } catch (StoreException e) { + LOGGER.error("{}: get consume times from order index failed", identity, e); + } + BatchWriteRequest rewriteOrderIndex = new BatchWriteRequest(KV_NAMESPACE_FIFO_INDEX, orderIndexKey, buildOrderIndexValue(-1, consumeTimes)); + requestList.add(rewriteOrderIndex); + } } + + kvService.batch(requestList.toArray(new BatchRequest[0])); } private List deleteCheckPointAndRelatedStatesReqs(CheckPoint checkPoint) { @@ -525,8 +497,18 @@ private List writeCheckPointAndRelatedStatesReqs(CheckPoint checkP long baseOffset = checkPoint.messageOffset() - checkPoint.count() + 1; for (int i = 0; i < checkPoint.count(); i++) { long currOffset = baseOffset + i; - BatchWriteRequest writeOrderIndexRequest = new BatchWriteRequest(KV_NAMESPACE_FIFO_INDEX, - buildOrderIndexKey(checkPoint.consumerGroupId(), checkPoint.topicId(), checkPoint.queueId(), currOffset), buildOrderIndexValue(checkPoint.operationId())); + byte[] orderIndexKey = buildOrderIndexKey(checkPoint.consumerGroupId(), checkPoint.topicId(), checkPoint.queueId(), currOffset); + int consumeTimes = 0; + try { + byte[] orderIndexValue = kvService.get(KV_NAMESPACE_FIFO_INDEX, orderIndexKey); + if (orderIndexValue != null) { + ByteBuffer wrappedBuffer = ByteBuffer.wrap(orderIndexValue); + consumeTimes = wrappedBuffer.getInt(8); + } + } catch (StoreException e) { + LOGGER.error("{}: get consume times from order index failed", identity, e); + } + BatchWriteRequest writeOrderIndexRequest = new BatchWriteRequest(KV_NAMESPACE_FIFO_INDEX, orderIndexKey, buildOrderIndexValue(checkPoint.operationId(), consumeTimes + 1)); requestList.add(writeOrderIndexRequest); } } @@ -537,16 +519,13 @@ private List writeCheckPointAndRelatedStatesReqs(CheckPoint checkP public OperationSnapshot takeSnapshot() throws StoreException { exclusiveLock.lock(); try { - List metadataSnapshots = consumerGroupMetadataMap.values().stream().map(metadata -> { - return new OperationSnapshot.ConsumerGroupMetadataSnapshot(metadata.getConsumerGroupId(), metadata.getConsumeOffset(), metadata.getAckOffset(), - metadata.getRetryConsumeOffset(), metadata.getRetryAckOffset(), - getAckCommitter(metadata.getConsumerGroupId()).getSerializedBuffer().array(), - getRetryAckCommitter(metadata.getConsumerGroupId()).getSerializedBuffer().array(), - metadata.getConsumeTimes(), metadata.getVersion()); - }).collect(Collectors.toList()); + List metadataSnapshots = consumerGroupMetadataMap.values() + .stream() + .map(metadata -> new ConsumerGroupMetadata(metadata.getConsumerGroupId(), metadata.getConsumeOffset(), metadata.getAckOffset(), + metadata.getRetryConsumeOffset(), metadata.getRetryAckOffset(), metadata.getVersion())) + .collect(Collectors.toList()); long snapshotVersion = kvService.takeSnapshot(); - OperationSnapshot snapshot = new OperationSnapshot(currentOperationOffset, snapshotVersion, metadataSnapshots); - return snapshot; + return new OperationSnapshot(currentOperationOffset, snapshotVersion, metadataSnapshots); } finally { exclusiveLock.unlock(); } @@ -559,12 +538,7 @@ public void loadSnapshot(OperationSnapshot snapshot) { this.consumerGroupMetadataMap = snapshot.getConsumerGroupMetadataList().stream().collect(Collectors.toConcurrentMap( ConsumerGroupMetadata::getConsumerGroupId, metadataSnapshot -> new ConsumerGroupMetadata(metadataSnapshot.getConsumerGroupId(), metadataSnapshot.getConsumeOffset(), metadataSnapshot.getAckOffset(), - metadataSnapshot.getRetryConsumeOffset(), metadataSnapshot.getRetryAckOffset(), metadataSnapshot.getConsumeTimes(), - metadataSnapshot.getVersion()))); - snapshot.getConsumerGroupMetadataList().forEach(metadataSnapshot -> { - getAckCommitter(metadataSnapshot.getConsumerGroupId(), ByteBuffer.wrap(metadataSnapshot.getAckOffsetBitmapBuffer())); - getRetryAckCommitter(metadataSnapshot.getConsumerGroupId(), ByteBuffer.wrap(metadataSnapshot.getRetryAckOffsetBitmapBuffer())); - }); + metadataSnapshot.getRetryConsumeOffset(), metadataSnapshot.getRetryAckOffset(), metadataSnapshot.getVersion()))); this.currentOperationOffset = snapshot.getSnapshotEndOffset(); } catch (Exception e) { Throwable cause = FutureUtil.cause(e); @@ -582,8 +556,6 @@ public void clear() throws StoreException { exclusiveLock.lock(); try { this.consumerGroupMetadataMap.clear(); - this.ackCommitterMap.clear(); - this.retryAckCommitterMap.clear(); this.currentOperationOffset = -1; List checkPointList = new ArrayList<>(); byte[] tqPrefix = SerializeUtil.buildCheckPointQueuePrefix(topicId, queueId); @@ -626,7 +598,15 @@ public boolean isLocked(long consumerGroupId, long offset) throws StoreException exclusiveLock.lock(); try { byte[] lockKey = buildOrderIndexKey(consumerGroupId, topicId, queueId, offset); - return kvService.get(KV_NAMESPACE_FIFO_INDEX, lockKey) != null; + byte[] value = kvService.get(KV_NAMESPACE_FIFO_INDEX, lockKey); + // If the message is acked or send to dead letter topic, the order index will be cleared. + if (value != null) { + long operationId = ByteBuffer.wrap(value).getLong(); + // If the operation id is positive, it means that the message is locked. + // If the message could be visible again, the ReviveService will set operation id to -1. + return operationId >= 0; + } + return false; } finally { exclusiveLock.unlock(); } @@ -634,58 +614,22 @@ public boolean isLocked(long consumerGroupId, long offset) throws StoreException @Override public int consumeTimes(long consumerGroupId, long offset) { - return consumerGroupMetadataMap.computeIfAbsent(consumerGroupId, k -> new ConsumerGroupMetadata(consumerGroupId)) - .getConsumeTimes() - .getOrDefault(offset, 0); - } - - static class AckCommitter { - private final long baseOffset; - private volatile long ackOffset; - private final RoaringBitmap bitmap; - private final Consumer ackAdvanceFn; - - public AckCommitter(long ackOffset, Consumer ackAdvanceFn) { - this(ackOffset, ackAdvanceFn, null); - } - - public AckCommitter(long ackOffset, Consumer ackAdvanceFn, ByteBuffer serializedBitmap) { - this.ackOffset = ackOffset; - this.ackAdvanceFn = ackAdvanceFn; - // deserialize bitmap - if (serializedBitmap == null || !serializedBitmap.hasRemaining()) { - this.baseOffset = ackOffset; - this.bitmap = new RoaringBitmap(); - } else { - this.baseOffset = serializedBitmap.getLong(); - this.bitmap = new RoaringBitmap(new ImmutableRoaringBitmap(serializedBitmap)); - } - } - - public synchronized void commitAck(long offset) { - if (offset >= ackOffset) { - int offsetInBitmap = (int) (offset - baseOffset); - bitmap.add(offsetInBitmap); - boolean advance = false; - while (bitmap.contains((int) (ackOffset - baseOffset))) { - ackOffset++; - advance = true; - } - if (advance) { - ackAdvanceFn.accept(ackOffset); - } + byte[] orderIndexKey = buildOrderIndexKey(consumerGroupId, topicId, queueId, offset); + try { + byte[] orderIndexValue = kvService.get(KV_NAMESPACE_FIFO_INDEX, orderIndexKey); + if (orderIndexValue == null) { + return 1; } + ByteBuffer wrappedBuffer = ByteBuffer.wrap(orderIndexValue); + return wrappedBuffer.getInt(8); + } catch (StoreException e) { + LOGGER.error("{}: get consume times from order index failed", identity, e); } + return 1; + } - // / - public synchronized ByteBuffer getSerializedBuffer() { - int length = bitmap.serializedSizeInBytes() + Long.BYTES; - ByteBuffer buffer = ByteBuffer.allocate(length); - buffer.putLong(baseOffset); - bitmap.serialize(buffer); - // Flip buffer to prepare read - buffer.flip(); - return buffer; - } + @Override + public void registerRetryAckOffsetListener(OffsetListener listener) { + retryAckOffsetListeners.add(listener); } } diff --git a/store/src/main/java/com/automq/rocketmq/store/queue/StreamLogicQueue.java b/store/src/main/java/com/automq/rocketmq/store/queue/StreamLogicQueue.java index 171ec5968..0b7c3e45e 100644 --- a/store/src/main/java/com/automq/rocketmq/store/queue/StreamLogicQueue.java +++ b/store/src/main/java/com/automq/rocketmq/store/queue/StreamLogicQueue.java @@ -90,6 +90,7 @@ public StreamLogicQueue(StoreConfig config, long topicId, int queueId, this.config = config; this.metadataService = metadataService; this.stateMachine = stateMachine; + stateMachine.registerRetryAckOffsetListener(this::onRetryAckOffsetAdvance); this.streamStore = streamStore; this.retryStreamIdMap = new ConcurrentHashMap<>(); this.operationLogService = operationLogService; @@ -138,18 +139,12 @@ public CompletableFuture open() { }) // recover from operation log .thenCompose(nil -> operationLogService.recover(stateMachine, operationStreamId, snapshotStreamId)) - .thenAccept(nil -> { - // register retry ack advance listener - this.stateMachine.registerRetryAckOffsetListener(this::onRetryAckOffsetAdvance); - state.set(State.OPENED); - }) .thenAccept(nil -> state.set(State.OPENED)); } return CompletableFuture.completedFuture(null); } private void onRetryAckOffsetAdvance(long consumerGroupId, long ackOffset) { - // TODO: add reclaim policy CompletableFuture retryStreamIdCf = retryStreamIdMap.get(consumerGroupId); if (retryStreamIdCf == null) { LOGGER.warn("Retry stream id not found for consumer group: {}", consumerGroupId); @@ -414,7 +409,6 @@ private CompletableFuture fetchMessages(StoreContext context, @Span .thenApply(fetchResult -> { AtomicLong fetchBytes = new AtomicLong(); - // TODO: Assume message count is always 1 in each batch for now. List resultList = fetchResult.recordBatchList() .stream() .map(batch -> { diff --git a/store/src/main/java/com/automq/rocketmq/store/service/RocksDBKVService.java b/store/src/main/java/com/automq/rocketmq/store/service/RocksDBKVService.java index d8ed269c2..15abe5d6b 100644 --- a/store/src/main/java/com/automq/rocketmq/store/service/RocksDBKVService.java +++ b/store/src/main/java/com/automq/rocketmq/store/service/RocksDBKVService.java @@ -139,6 +139,30 @@ public byte[] get(final String namespace, final byte[] key, } } + @Override + public byte[] getByPrefix(String namespace, final byte[] prefix) throws StoreException { + if (stopped) { + throw new StoreException(StoreErrorCode.KV_SERVICE_IS_NOT_RUNNING, "KV service is stopped."); + } + + if (!columnFamilyNameHandleMap.containsKey(namespace)) { + return null; + } + + ColumnFamilyHandle handle = columnFamilyNameHandleMap.get(namespace); + try (RocksIterator iterator = rocksDB.newIterator(handle)) { + iterator.seek(prefix); + if (!iterator.isValid()) { + return null; + } + byte[] key = iterator.key(); + if (!checkPrefix(key, prefix)) { + return null; + } + return iterator.value(); + } + } + private void transformKVReadOptions(ReadOptions readOptions, KVReadOptions kvReadOptions) { if (kvReadOptions.isSnapshotRead()) { readOptions.setSnapshot(snapshotMap.get(kvReadOptions.getSnapshotVersion())); diff --git a/store/src/main/java/com/automq/rocketmq/store/service/api/KVService.java b/store/src/main/java/com/automq/rocketmq/store/service/api/KVService.java index 2686938b5..dece8db3f 100644 --- a/store/src/main/java/com/automq/rocketmq/store/service/api/KVService.java +++ b/store/src/main/java/com/automq/rocketmq/store/service/api/KVService.java @@ -38,6 +38,8 @@ default byte[] get(final String namespace, final byte[] key) throws StoreExcepti return get(namespace, key, new KVReadOptions()); } + byte[] getByPrefix(final String namespace, final byte[] prefix) throws StoreException; + /** * Iterate all the k-v pairs. * diff --git a/store/src/main/java/com/automq/rocketmq/store/util/SerializeUtil.java b/store/src/main/java/com/automq/rocketmq/store/util/SerializeUtil.java index 221ee3fd4..cc6536e99 100644 --- a/store/src/main/java/com/automq/rocketmq/store/util/SerializeUtil.java +++ b/store/src/main/java/com/automq/rocketmq/store/util/SerializeUtil.java @@ -23,6 +23,7 @@ import com.automq.rocketmq.store.model.generated.CheckPoint; import com.automq.rocketmq.store.model.generated.OperationLogItem; import com.automq.rocketmq.store.model.generated.ReceiptHandle; +import com.automq.rocketmq.store.model.metadata.ConsumerGroupMetadata; import com.automq.rocketmq.store.model.operation.AckOperation; import com.automq.rocketmq.store.model.operation.ChangeInvisibleDurationOperation; import com.automq.rocketmq.store.model.operation.Operation; @@ -37,7 +38,6 @@ import java.util.ArrayList; import java.util.Base64; import java.util.List; -import java.util.concurrent.ConcurrentSkipListMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,8 +79,8 @@ public static byte[] buildCheckPointKey(long topicId, int queueId, long consumer return buffer.array(); } - public static byte[] buildCheckPointValue(long topicId, int queueId, long offset, int count, - long consumerGroupId, long operationId, PopOperation.PopOperationType operationType, long deliveryTimestamp, + public static byte[] buildCheckPointValue(long topicId, int queueId, long offset, int count, long consumerGroupId, + long operationId, PopOperation.PopOperationType operationType, long deliveryTimestamp, long nextVisibleTimestamp) { FlatBufferBuilder builder = new FlatBufferBuilder(); int root = CheckPoint.createCheckPoint(builder, topicId, queueId, offset, count, consumerGroupId, operationId, operationType.value(), deliveryTimestamp, nextVisibleTimestamp); @@ -108,9 +108,10 @@ public static byte[] buildOrderIndexKey(long consumerGroupId, long topicId, int } // - public static byte[] buildOrderIndexValue(long operationId) { - ByteBuffer buffer = ByteBuffer.allocate(8); + public static byte[] buildOrderIndexValue(long operationId, int consumeTimes) { + ByteBuffer buffer = ByteBuffer.allocate(12); buffer.putLong(operationId); + buffer.putInt(consumeTimes); return buffer.array(); } @@ -196,20 +197,10 @@ public static byte[] encodeOperationSnapshot(OperationSnapshot snapshot) { FlatBufferBuilder builder = new FlatBufferBuilder(); int[] consumerGroupMetadataOffsets = new int[snapshot.getConsumerGroupMetadataList().size()]; for (int i = 0; i < snapshot.getConsumerGroupMetadataList().size(); i++) { - OperationSnapshot.ConsumerGroupMetadataSnapshot consumerGroupMetadata = snapshot.getConsumerGroupMetadataList().get(i); - int ackOffsetBitmapOffset = builder.createByteVector(consumerGroupMetadata.getAckOffsetBitmapBuffer()); - int retryAckOffsetBitmapOffset = builder.createByteVector(consumerGroupMetadata.getRetryAckOffsetBitmapBuffer()); - List consumeTimesOffsets = new ArrayList<>(consumerGroupMetadata.getConsumeTimes().size()); - consumerGroupMetadata.getConsumeTimes().entrySet().forEach(entry -> { - int consumeTimeOffset = com.automq.rocketmq.store.model.generated.ConsumeTimes.createConsumeTimes(builder, entry.getKey(), entry.getValue()); - consumeTimesOffsets.add(consumeTimeOffset); - }); - int consumeTimesVectorOffset = com.automq.rocketmq.store.model.generated.ConsumerGroupMetadata.createConsumeTimesVector(builder, consumeTimesOffsets.stream().mapToInt(Integer::intValue).toArray()); + ConsumerGroupMetadata consumerGroupMetadata = snapshot.getConsumerGroupMetadataList().get(i); int consumerGroupMetadataOffset = com.automq.rocketmq.store.model.generated.ConsumerGroupMetadata.createConsumerGroupMetadata(builder, consumerGroupMetadata.getConsumerGroupId(), consumerGroupMetadata.getConsumeOffset(), consumerGroupMetadata.getAckOffset(), - ackOffsetBitmapOffset, consumerGroupMetadata.getRetryConsumeOffset(), consumerGroupMetadata.getRetryAckOffset(), - retryAckOffsetBitmapOffset, consumeTimesVectorOffset, consumerGroupMetadata.getVersion()); consumerGroupMetadataOffsets[i] = consumerGroupMetadataOffset; } @@ -233,22 +224,11 @@ public static OperationSnapshot decodeOperationSnapshot(ByteBuffer buffer) { } com.automq.rocketmq.store.model.generated.OperationSnapshot snapshot = com.automq.rocketmq.store.model.generated.OperationSnapshot.getRootAsOperationSnapshot(buffer); - List consumerGroupMetadataList = new ArrayList<>(snapshot.consumerGroupMetadatasLength()); + List consumerGroupMetadataList = new ArrayList<>(snapshot.consumerGroupMetadatasLength()); for (int i = 0; i < snapshot.consumerGroupMetadatasLength(); i++) { com.automq.rocketmq.store.model.generated.ConsumerGroupMetadata consumerGroupMetadata = snapshot.consumerGroupMetadatas(i); - byte[] ackBitMap = new byte[consumerGroupMetadata.ackBitMapLength()]; - consumerGroupMetadata.ackBitMapAsByteBuffer().get(ackBitMap); - byte[] retryAckBitMap = new byte[consumerGroupMetadata.retryAckBitMapLength()]; - ConcurrentSkipListMap consumeTimes = new ConcurrentSkipListMap<>(); - for (int j = 0; j < consumerGroupMetadata.consumeTimesLength(); j++) { - com.automq.rocketmq.store.model.generated.ConsumeTimes consumeTime = consumerGroupMetadata.consumeTimes(j); - consumeTimes.put(consumeTime.offset(), consumeTime.consumeTimes()); - } - consumerGroupMetadata.retryAckBitMapAsByteBuffer().get(retryAckBitMap); - consumerGroupMetadataList.add(new OperationSnapshot.ConsumerGroupMetadataSnapshot(consumerGroupMetadata.consumerGroupId(), - consumerGroupMetadata.consumeOffset(), consumerGroupMetadata.ackOffset(), consumerGroupMetadata.retryConsumeOffset(), consumerGroupMetadata.retryAckOffset(), - ackBitMap, retryAckBitMap, consumeTimes, - consumerGroupMetadata.version())); + consumerGroupMetadataList.add(new ConsumerGroupMetadata(consumerGroupMetadata.consumerGroupId(), consumerGroupMetadata.consumeOffset(), + consumerGroupMetadata.ackOffset(), consumerGroupMetadata.retryConsumeOffset(), consumerGroupMetadata.retryAckOffset(), consumerGroupMetadata.version())); } List checkPointList = new ArrayList<>(snapshot.checkPointsLength()); for (int i = 0; i < snapshot.checkPointsLength(); i++) { diff --git a/store/src/test/java/com/automq/rocketmq/store/LogicQueueTest.java b/store/src/test/java/com/automq/rocketmq/store/LogicQueueTest.java index 536281c98..ae7138d42 100644 --- a/store/src/test/java/com/automq/rocketmq/store/LogicQueueTest.java +++ b/store/src/test/java/com/automq/rocketmq/store/LogicQueueTest.java @@ -515,7 +515,7 @@ void pop_fifo_filter_ack() { // 3. ack second message in result AckResult ackResult = logicQueue.ack(popMessageList.get(1).receiptHandle().get()).join(); assertEquals(AckResult.Status.SUCCESS, ackResult.status()); - assertEquals(0, stateMachine.ackOffset(CONSUMER_GROUP_ID)); + assertEquals(2, stateMachine.ackOffset(CONSUMER_GROUP_ID)); // 4. pop fifo again popResult = logicQueue.popFifo(StoreContext.EMPTY, CONSUMER_GROUP_ID, new TagFilter("TagB"), 1, 100).join(); diff --git a/store/src/test/java/com/automq/rocketmq/store/metadata/ConsumerGroupMetadataTest.java b/store/src/test/java/com/automq/rocketmq/store/metadata/ConsumerGroupMetadataTest.java index 592ba5b0c..57b0712f3 100644 --- a/store/src/test/java/com/automq/rocketmq/store/metadata/ConsumerGroupMetadataTest.java +++ b/store/src/test/java/com/automq/rocketmq/store/metadata/ConsumerGroupMetadataTest.java @@ -30,29 +30,14 @@ public class ConsumerGroupMetadataTest { public void ack() { ConsumerGroupMetadata metadata = new ConsumerGroupMetadata(CONSUMER_GROUP_ID); assertEquals(0L, metadata.getAckOffset()); - metadata.setAckOffset(1L); + metadata.advanceAckOffset(1L); assertEquals(1L, metadata.getAckOffset()); - // add consume times - metadata.getConsumeTimes().put(0L, 1); - metadata.getConsumeTimes().put(1L, 2); - metadata.getConsumeTimes().put(2L, 3); - metadata.getConsumeTimes().put(4L, 5); - metadata.getConsumeTimes().put(5L, 6); - metadata.getConsumeTimes().put(6L, 7); - - metadata.setAckOffset(3L); + metadata.advanceAckOffset(3L); assertEquals(3L, metadata.getAckOffset()); - assertEquals(3, metadata.getConsumeTimes().size()); - assertEquals(5, metadata.getConsumeTimes().get(4L)); - assertEquals(6, metadata.getConsumeTimes().get(5L)); - assertEquals(7, metadata.getConsumeTimes().get(6L)); - metadata.setAckOffset(6L); + metadata.advanceAckOffset(6L); assertEquals(6L, metadata.getAckOffset()); - assertEquals(1, metadata.getConsumeTimes().size()); - assertEquals(7, metadata.getConsumeTimes().get(6L)); - } diff --git a/store/src/test/java/com/automq/rocketmq/store/queue/AckCommitterTest.java b/store/src/test/java/com/automq/rocketmq/store/queue/AckCommitterTest.java deleted file mode 100644 index ad5368111..000000000 --- a/store/src/test/java/com/automq/rocketmq/store/queue/AckCommitterTest.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.automq.rocketmq.store.queue; - -import java.nio.ByteBuffer; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.roaringbitmap.RoaringBitmap; -import org.roaringbitmap.buffer.ImmutableRoaringBitmap; - -public class AckCommitterTest { - - @Test - public void testGetAckBitmapBuffer() { - DefaultLogicQueueStateMachine.AckCommitter ackCommitter = new DefaultLogicQueueStateMachine.AckCommitter(100, - System.out::println); - ackCommitter.commitAck(102); - ackCommitter.commitAck(202); - ackCommitter.commitAck(302); - ByteBuffer buffer = ackCommitter.getSerializedBuffer(); - - Assertions.assertEquals(0, buffer.position()); - Assertions.assertTrue(buffer.limit() > 0); - - Assertions.assertEquals(100, buffer.getLong()); - - RoaringBitmap bitmap = new RoaringBitmap(new ImmutableRoaringBitmap(buffer)); - Assertions.assertTrue(bitmap.contains(2)); - Assertions.assertTrue(bitmap.contains(102)); - Assertions.assertTrue(bitmap.contains(202)); - } -} diff --git a/store/src/test/java/com/automq/rocketmq/store/service/ReviveServiceTest.java b/store/src/test/java/com/automq/rocketmq/store/service/ReviveServiceTest.java index 91b56885b..517662323 100644 --- a/store/src/test/java/com/automq/rocketmq/store/service/ReviveServiceTest.java +++ b/store/src/test/java/com/automq/rocketmq/store/service/ReviveServiceTest.java @@ -101,8 +101,8 @@ void revive_normal() throws StoreException { Mockito.doAnswer(ink -> { long consumerGroupId = ink.getArgument(1); assertEquals(CONSUMER_GROUP_ID, consumerGroupId); - FlatMessageExt flatMessageExt = ink.getArgument(2); - assertNotNull(flatMessageExt); + FlatMessage flatMessage = ink.getArgument(2); + assertNotNull(flatMessage); return CompletableFuture.completedFuture(null); }).when(deadLetterSender).send(Mockito.any(), Mockito.anyLong(), Mockito.any(FlatMessage.class)); // mock max delivery attempts @@ -169,6 +169,7 @@ void revive_fifo() throws StoreException { // mock max delivery attempts Mockito.doReturn(CompletableFuture.completedFuture(2)) .when(metadataService).maxDeliveryAttemptsOf(Mockito.anyLong()); + // Append mock message. for (int i = 0; i < 2; i++) { FlatMessage message = FlatMessage.getRootAsFlatMessage(buildMessage(TOPIC_ID, QUEUE_ID, "TagA")); diff --git a/store/src/test/java/com/automq/rocketmq/store/util/SerializeUtilTest.java b/store/src/test/java/com/automq/rocketmq/store/util/SerializeUtilTest.java index 6b0cb0973..f716b9276 100644 --- a/store/src/test/java/com/automq/rocketmq/store/util/SerializeUtilTest.java +++ b/store/src/test/java/com/automq/rocketmq/store/util/SerializeUtilTest.java @@ -22,6 +22,7 @@ import com.automq.rocketmq.store.mock.MockMessageUtil; import com.automq.rocketmq.store.model.generated.CheckPoint; import com.automq.rocketmq.store.model.generated.ReceiptHandle; +import com.automq.rocketmq.store.model.metadata.ConsumerGroupMetadata; import com.automq.rocketmq.store.model.operation.AckOperation; import com.automq.rocketmq.store.model.operation.AckOperation.AckOperationType; import com.automq.rocketmq.store.model.operation.ChangeInvisibleDurationOperation; @@ -32,11 +33,8 @@ import com.automq.rocketmq.store.model.operation.ResetConsumeOffsetOperation; import java.nio.ByteBuffer; import java.util.List; -import java.util.concurrent.ConcurrentSkipListMap; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.roaringbitmap.RoaringBitmap; -import org.roaringbitmap.buffer.ImmutableRoaringBitmap; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -113,8 +111,9 @@ void buildOrderIndexKey() { @Test void buildOrderIndexValue() { - byte[] value = SerializeUtil.buildOrderIndexValue(OPERATION_ID); - assertEquals(8, value.length); + byte[] value = SerializeUtil.buildOrderIndexValue(OPERATION_ID, QUEUE_ID); + assertEquals(12, value.length); + assertEquals(QUEUE_ID, ByteBuffer.wrap(value).getInt(8)); } @Test @@ -176,28 +175,8 @@ void encodeResetConsumerOffset() throws StoreException { @Test void encodeOperationSnapshot() { - ConcurrentSkipListMap consumeTimes = new ConcurrentSkipListMap<>(); - consumeTimes.put(1L, 2); - consumeTimes.put(3L, 4); - consumeTimes.put(5L, 6); - RoaringBitmap ackBitmap = new RoaringBitmap(); - ackBitmap.add(1, 3, 5); - int ackBitmapLength = ackBitmap.serializedSizeInBytes(); - ByteBuffer ackBitmapBuffer = ByteBuffer.allocate(ackBitmapLength); - ackBitmap.serialize(ackBitmapBuffer); - ackBitmapBuffer.flip(); - - RoaringBitmap retryAckBitmap = new RoaringBitmap(); - retryAckBitmap.add(2, 4, 6); - int retryAckBitmapLength = retryAckBitmap.serializedSizeInBytes(); - ByteBuffer retryAckBitmapBuffer = ByteBuffer.allocate(retryAckBitmapLength); - retryAckBitmap.serialize(retryAckBitmapBuffer); - retryAckBitmapBuffer.flip(); - - OperationSnapshot.ConsumerGroupMetadataSnapshot consumerGroupMetadataSnapshot = new OperationSnapshot.ConsumerGroupMetadataSnapshot( - CONSUMER_GROUP_ID, 1, 2, 3, 4, ackBitmapBuffer.array(), retryAckBitmapBuffer.array(), - consumeTimes, CONSUMER_GROUP_VERSION - ); + ConsumerGroupMetadata consumerGroupMetadataSnapshot = new ConsumerGroupMetadata( + CONSUMER_GROUP_ID, 1, 2, 3, 4, CONSUMER_GROUP_VERSION); byte[] checkPointValue = SerializeUtil.buildCheckPointValue(TOPIC_ID, QUEUE_ID, OFFSET, COUNT, CONSUMER_GROUP_ID, OPERATION_ID, POP_OPERATION_TYPE, DELIVERY_TIMESTAMP, NEXT_VISIBLE_TIMESTAMP); CheckPoint checkPoint = CheckPoint.getRootAsCheckPoint(ByteBuffer.wrap(checkPointValue)); OperationSnapshot operationSnapshot = new OperationSnapshot( @@ -223,13 +202,6 @@ void encodeOperationSnapshot() { assertEquals(operationSnapshot.getCheckPoints().get(0).nextVisibleTimestamp(), decodedOperationSnapshot.getCheckPoints().get(0).nextVisibleTimestamp()); assertEquals(operationSnapshot.getConsumerGroupMetadataList(), decodedOperationSnapshot.getConsumerGroupMetadataList()); - byte[] decodedAckBitmapBuffer = decodedOperationSnapshot.getConsumerGroupMetadataList().get(0).getAckOffsetBitmapBuffer(); - RoaringBitmap decodedAckBitmap = new RoaringBitmap(new ImmutableRoaringBitmap(ByteBuffer.wrap(decodedAckBitmapBuffer))); - assertEquals(ackBitmap, decodedAckBitmap); - byte[] decodedRetryAckBitmapBuffer = decodedOperationSnapshot.getConsumerGroupMetadataList().get(0).getRetryAckOffsetBitmapBuffer(); - RoaringBitmap decodedRetryAckBitmap = new RoaringBitmap(new ImmutableRoaringBitmap(ByteBuffer.wrap(decodedRetryAckBitmapBuffer))); - assertEquals(retryAckBitmap, decodedRetryAckBitmap); - assertEquals(operationSnapshot.getConsumerGroupMetadataList().get(0).getConsumeTimes(), decodedOperationSnapshot.getConsumerGroupMetadataList().get(0).getConsumeTimes()); } @Test