diff --git a/store/src/main/fbs/store.fbs b/store/src/main/fbs/store.fbs index 59a1e9f07..19349e816 100644 --- a/store/src/main/fbs/store.fbs +++ b/store/src/main/fbs/store.fbs @@ -1,11 +1,20 @@ namespace com.automq.rocketmq.store.model.generated; table CheckPoint { - serial_number:long; - delivery_timestamp:long; - invisible_duration:long; topic_id:long; - queue_id:long; - offset:long; + queue_id:int; + messge_offset:long; consumer_group_id:long; + operation_id:long; + is_order:bool; + delivery_timestamp:long; + next_visible_timestamp:long; + reconsume_count:int; +} + +table ReceiptHandle { + topic_id:long; + queue_id:int; + message_offset:long; + operation_id:long; } diff --git a/store/src/main/java/com/automq/rocketmq/store/MessageStore.java b/store/src/main/java/com/automq/rocketmq/store/MessageStore.java index 2eb91b7fb..6b5622f5a 100644 --- a/store/src/main/java/com/automq/rocketmq/store/MessageStore.java +++ b/store/src/main/java/com/automq/rocketmq/store/MessageStore.java @@ -22,11 +22,36 @@ import com.automq.rocketmq.store.model.message.PopResult; public interface MessageStore { - PopResult pop(long consumeGroupId, long topicId, int queueId, long offset, int maxCount, boolean isOrder, long invisibleDuration); + /** + * Pop message from specified topic and queue. + * + * @param consumeGroupId consumer group id that launches this query + * @param topicId topic id to pop message from + * @param queueId queue id to pop message from + * @param offset offset to start from + * @param batchSize maximum count of messages + * @param isOrder is orderly pop + * @param invisibleDuration the duration for the next time this batch of messages will be visible, in nanoseconds + * @return pop result, see {@link PopResult} + */ + PopResult pop(long consumeGroupId, long topicId, int queueId, long offset, int batchSize, boolean isOrder, long invisibleDuration); + /** + * Ack message. + * + * @param receiptHandle unique receipt handle to identify inflight message + * @return ack result, see {@link AckResult} + */ AckResult ack(String receiptHandle); - ChangeInvisibleDurationResult changeInvisibleDuration(String receiptHandle, int invisibleTime); + /** + * Change invisible duration. + * + * @param receiptHandle unique receipt handle to identify inflight message + * @param invisibleDuration the duration for the next time this batch of messages will be visible, in nanoseconds + * @return change invisible duration result, see {@link ChangeInvisibleDurationResult} + */ + ChangeInvisibleDurationResult changeInvisibleDuration(String receiptHandle, long invisibleDuration); int getInflightStatsByQueue(long topicId, int queueId); diff --git a/store/src/main/java/com/automq/rocketmq/store/impl/MessageStoreImpl.java b/store/src/main/java/com/automq/rocketmq/store/impl/MessageStoreImpl.java index f7816cf93..63364a392 100644 --- a/store/src/main/java/com/automq/rocketmq/store/impl/MessageStoreImpl.java +++ b/store/src/main/java/com/automq/rocketmq/store/impl/MessageStoreImpl.java @@ -21,6 +21,9 @@ import com.automq.rocketmq.store.MessageStore; import com.automq.rocketmq.store.StreamStore; import com.automq.rocketmq.store.model.generated.CheckPoint; +import com.automq.rocketmq.store.model.generated.ReceiptHandle; +import com.automq.rocketmq.store.model.kv.BatchDeleteRequest; +import com.automq.rocketmq.store.model.kv.BatchRequest; import com.automq.rocketmq.store.model.kv.BatchWriteRequest; import com.automq.rocketmq.store.model.message.AckResult; import com.automq.rocketmq.store.model.message.ChangeInvisibleDurationResult; @@ -29,13 +32,17 @@ import com.google.flatbuffers.FlatBufferBuilder; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import org.rocksdb.RocksDBException; public class MessageStoreImpl implements MessageStore { protected static final String KV_PARTITION_CHECK_POINT = "check_point"; protected static final String KV_PARTITION_TIMER_TAG = "timer_tag"; + protected static final String KV_PARTITION_ORDER_INDEX = "order_index"; private final StreamStore streamStore; private final KVService kvService; @@ -47,88 +54,248 @@ public MessageStoreImpl(StreamStore streamStore, KVService kvService) { this.kvService = kvService; } - // - protected static byte[] buildCheckPointKey(long consumeGroupId, long topicId, int queueId, long offset) { + // + protected static byte[] buildCheckPointKey(long topicId, int queueId, long offset, long operationId) { ByteBuffer buffer = ByteBuffer.allocate(28); - buffer.putLong(0, consumeGroupId); - buffer.putLong(8, topicId); - buffer.putInt(16, queueId); - buffer.putLong(20, offset); + buffer.putLong(0, topicId); + buffer.putInt(8, queueId); + buffer.putLong(12, offset); + buffer.putLong(20, operationId); return buffer.array(); } - private static byte[] buildCheckPointValue(long serialNumber, long deliveryTimestamp, long invisibleDuration, - long topicId, - int queueId, long offset, long consumeGroupId) { + private static byte[] buildCheckPointValue(long topicId, int queueId, long offset, + long consumeGroupId, long operationId, boolean isOrder, long deliveryTimestamp, long invisibleDuration, + int reconsumeCount) { FlatBufferBuilder builder = new FlatBufferBuilder(); - int root = CheckPoint.createCheckPoint(builder, serialNumber, deliveryTimestamp, invisibleDuration, topicId, queueId, offset, consumeGroupId); + int root = CheckPoint.createCheckPoint(builder, topicId, queueId, offset, consumeGroupId, operationId, isOrder, deliveryTimestamp, invisibleDuration, reconsumeCount); builder.finish(root); return builder.sizedByteArray(); } - // - protected static byte[] buildTimerTagKey(long invisibleTime, long consumeGroupId, long topicId, int queueId, - long offset) { - ByteBuffer buffer = ByteBuffer.allocate(36); - buffer.putLong(0, invisibleTime); - buffer.putLong(8, consumeGroupId); - buffer.putLong(16, topicId); - buffer.putInt(24, queueId); - buffer.putLong(28, offset); + // + private static byte[] buildTimerTagKey(long nextVisibleTimestamp, long topicId, int queueId, + long operationId) { + ByteBuffer buffer = ByteBuffer.allocate(28); + buffer.putLong(0, nextVisibleTimestamp); + buffer.putLong(8, topicId); + buffer.putInt(16, queueId); + buffer.putLong(20, operationId); + return buffer.array(); + } + + // + protected static byte[] buildOrderIndexKey(long consumeGroupId, long topicId, int queueId, long offset) { + ByteBuffer buffer = ByteBuffer.allocate(28); + buffer.putLong(0, consumeGroupId); + buffer.putLong(8, topicId); + buffer.putInt(16, queueId); + buffer.putLong(20, offset); + return buffer.array(); + } + + // + private static byte[] buildOrderIndexValue(long operationId) { + ByteBuffer buffer = ByteBuffer.allocate(8); + buffer.putLong(0, operationId); return buffer.array(); } @Override - public PopResult pop(long consumeGroupId, long topicId, int queueId, long offset, int maxCount, boolean isOrder, + public PopResult pop(long consumeGroupId, long topicId, int queueId, long offset, int batchSize, boolean isOrder, long invisibleDuration) { - // TODO: write this request to operation log and get the serial number - // serial number should be monotonically increasing - long serialNumber = fakeSerialNumberGenerator.getAndIncrement(); + // TODO: Write this request to operation log and get the serial number + // Serial number should be monotonically increasing for each queue + long operationId = fakeSerialNumberGenerator.getAndIncrement(); long deliveryTimestamp = System.nanoTime(); + long nextVisibleTime = deliveryTimestamp + invisibleDuration; - // TODO: fetch message from stream store + // TODO: fetch message and retry message from stream store List messageList = new ArrayList<>(); // add mock message messageList.add(new Message(0)); - // insert check point and timer tag into KVService - messageList.forEach(message -> { + // If pop orderly, check whether the message is already consumed. + Map orderCheckPointMap = new HashMap<>(); + if (isOrder) { + for (int i = 0; i < batchSize; i++) { + try { + // TODO: Undefined behavior if last operation is not orderly. + byte[] orderIndexKey = buildOrderIndexKey(consumeGroupId, topicId, queueId, offset + i); + byte[] bytes = kvService.get(KV_PARTITION_ORDER_INDEX, orderIndexKey); + // If order index not found, this message has not been consumed. + if (bytes == null) { + continue; + } + long lastOperationId = ByteBuffer.wrap(bytes).getLong(); + byte[] checkPoint = kvService.get(KV_PARTITION_CHECK_POINT, buildCheckPointKey(topicId, queueId, offset + i, lastOperationId)); + if (checkPoint != null) { + orderCheckPointMap.put(offset + i, CheckPoint.getRootAsCheckPoint(ByteBuffer.wrap(checkPoint))); + } else { + // TODO: log finding a orphan index, this maybe a bug + kvService.delete(KV_PARTITION_ORDER_INDEX, orderIndexKey); + } + } catch (RocksDBException e) { + // TODO: handle exception + throw new RuntimeException(e); + } + } + } + + // Insert or renew check point and timer tag into KVService. + for (Message message : messageList) { try { + // If pop orderly, the message already consumed will not trigger writing new check point. + // But reconsume count should be increased. + if (isOrder && orderCheckPointMap.containsKey(message.offset())) { + // Delete last check point and timer tag. + CheckPoint lastCheckPoint = orderCheckPointMap.get(message.offset()); + BatchDeleteRequest deleteLastCheckPointRequest = new BatchDeleteRequest(KV_PARTITION_CHECK_POINT, + buildCheckPointKey(topicId, queueId, message.offset(), lastCheckPoint.operationId())); + + BatchDeleteRequest deleteLastTimerTagRequest = new BatchDeleteRequest(KV_PARTITION_TIMER_TAG, + buildTimerTagKey(lastCheckPoint.nextVisibleTimestamp(), topicId, queueId, lastCheckPoint.operationId())); + + // Write new check point, timer tag, and order index. + BatchWriteRequest writeCheckPointRequest = new BatchWriteRequest(KV_PARTITION_CHECK_POINT, + buildCheckPointKey(topicId, queueId, message.offset(), operationId), + buildCheckPointValue(topicId, queueId, message.offset(), consumeGroupId, operationId, true, deliveryTimestamp, nextVisibleTime, lastCheckPoint.reconsumeCount() + 1)); + + BatchWriteRequest writeTimerTagRequest = new BatchWriteRequest(KV_PARTITION_TIMER_TAG, + buildTimerTagKey(nextVisibleTime, topicId, queueId, message.offset()), new byte[0]); + + BatchWriteRequest writeOrderIndexRequest = new BatchWriteRequest(KV_PARTITION_ORDER_INDEX, + buildOrderIndexKey(consumeGroupId, topicId, queueId, message.offset()), buildOrderIndexValue(operationId)); + kvService.batch(deleteLastCheckPointRequest, deleteLastTimerTagRequest, writeCheckPointRequest, writeTimerTagRequest, writeOrderIndexRequest); + continue; + } + + // If this message is not orderly or has not been consumed, write check point and timer tag to KV service atomically. + List requestList = new ArrayList<>(); BatchWriteRequest writeCheckPointRequest = new BatchWriteRequest(KV_PARTITION_CHECK_POINT, - buildCheckPointKey(consumeGroupId, topicId, queueId, message.offset()), - buildCheckPointValue(serialNumber, deliveryTimestamp, invisibleDuration, topicId, queueId, message.offset(), consumeGroupId)); + buildCheckPointKey(topicId, queueId, message.offset(), operationId), + buildCheckPointValue(topicId, queueId, message.offset(), consumeGroupId, operationId, isOrder, deliveryTimestamp, nextVisibleTime, 0)); + requestList.add(writeCheckPointRequest); + BatchWriteRequest writeTimerTagRequest = new BatchWriteRequest(KV_PARTITION_TIMER_TAG, - buildTimerTagKey(invisibleDuration, consumeGroupId, topicId, queueId, message.offset()), new byte[0]); - // write check point and timer tag to KV service atomically - kvService.writeBatch(writeCheckPointRequest, writeTimerTagRequest); + buildTimerTagKey(nextVisibleTime, topicId, queueId, message.offset()), new byte[0]); + requestList.add(writeTimerTagRequest); + + // If this message is orderly, write order index to KV service. + if (isOrder) { + BatchWriteRequest writeOrderIndexRequest = new BatchWriteRequest(KV_PARTITION_ORDER_INDEX, + buildOrderIndexKey(consumeGroupId, topicId, queueId, message.offset()), buildOrderIndexValue(operationId)); + requestList.add(writeOrderIndexRequest); + } + kvService.batch(requestList.toArray(new BatchRequest[0])); } catch (RocksDBException e) { + // TODO: handle exception throw new RuntimeException(e); } - }); + } + // TODO: If not pop message orderly, commit consumer offset. if (!isOrder) { - // TODO: commit consumer offset } - return new PopResult(0, deliveryTimestamp, messageList); + return new PopResult(0, operationId, deliveryTimestamp, messageList); + } + + protected static String encodeReceiptHandle(long topicId, int queueId, long offset, long operationId) { + FlatBufferBuilder builder = new FlatBufferBuilder(); + int root = ReceiptHandle.createReceiptHandle(builder, topicId, queueId, offset, operationId); + builder.finish(root); + return new String(Base64.getEncoder().encode(builder.sizedByteArray())); + } + + private static ReceiptHandle decodeReceiptHandle(String receiptHandle) { + byte[] bytes = Base64.getDecoder().decode(receiptHandle); + return ReceiptHandle.getRootAsReceiptHandle(ByteBuffer.wrap(bytes)); } @Override public AckResult ack(String receiptHandle) { - // write this request to operation log + // TODO: Write this request to operation log and get the serial number - // delete check point and timer tag according to receiptHandle + // Delete check point and timer tag according to receiptHandle + ReceiptHandle handle = decodeReceiptHandle(receiptHandle); - return null; + try { + // Check if check point exists. + byte[] checkPointKey = buildCheckPointKey(handle.topicId(), handle.queueId(), handle.messageOffset(), handle.operationId()); + byte[] buffer = kvService.get(KV_PARTITION_CHECK_POINT, checkPointKey); + if (buffer == null) { + // TODO: Check point not found + return new AckResult(); + } + + // TODO: Data race between ack and revive. + CheckPoint checkPoint = CheckPoint.getRootAsCheckPoint(ByteBuffer.wrap(buffer)); + + List requestList = new ArrayList<>(); + BatchDeleteRequest deleteCheckPointRequest = new BatchDeleteRequest(KV_PARTITION_CHECK_POINT, checkPointKey); + requestList.add(deleteCheckPointRequest); + + BatchDeleteRequest deleteTimerTagRequest = new BatchDeleteRequest(KV_PARTITION_TIMER_TAG, + buildTimerTagKey(checkPoint.nextVisibleTimestamp(), handle.topicId(), handle.queueId(), checkPoint.messgeOffset())); + requestList.add(deleteTimerTagRequest); + + // TODO: Check and commit consumer offset if pop message orderly + if (checkPoint.isOrder()) { + BatchDeleteRequest deleteOrderIndexRequest = new BatchDeleteRequest(KV_PARTITION_ORDER_INDEX, + buildOrderIndexKey(checkPoint.consumerGroupId(), handle.topicId(), handle.queueId(), checkPoint.messgeOffset())); + requestList.add(deleteOrderIndexRequest); + } + + kvService.batch(requestList.toArray(new BatchRequest[0])); + } catch (RocksDBException e) { + // TODO: handle exception + throw new RuntimeException(e); + } + + return new AckResult(); } @Override - public ChangeInvisibleDurationResult changeInvisibleDuration(String receiptHandle, int invisibleDuration) { - // write this request to operation log + public ChangeInvisibleDurationResult changeInvisibleDuration(String receiptHandle, long invisibleDuration) { + long nextInvisibleTimestamp = System.nanoTime() + invisibleDuration; + + // TODO: Write this request to operation log and get the serial number // change invisibleTime in check point info and regenerate timer tag + ReceiptHandle handle = decodeReceiptHandle(receiptHandle); + try { + // Check if check point exists. + byte[] checkPointKey = buildCheckPointKey(handle.topicId(), handle.queueId(), handle.messageOffset(), handle.operationId()); + byte[] buffer = kvService.get(KV_PARTITION_CHECK_POINT, checkPointKey); + if (buffer == null) { + // TODO: Check point not found + return new ChangeInvisibleDurationResult(); + } + + // Delete last timer tag. + CheckPoint checkPoint = CheckPoint.getRootAsCheckPoint(ByteBuffer.wrap(buffer)); + + BatchDeleteRequest deleteLastTimerTagRequest = new BatchDeleteRequest(KV_PARTITION_TIMER_TAG, + buildTimerTagKey(checkPoint.nextVisibleTimestamp(), checkPoint.topicId(), checkPoint.queueId(), checkPoint.operationId())); + + // Write new check point and timer tag. + BatchWriteRequest writeCheckPointRequest = new BatchWriteRequest(KV_PARTITION_CHECK_POINT, + buildCheckPointKey(checkPoint.topicId(), checkPoint.queueId(), checkPoint.messgeOffset(), checkPoint.operationId()), + buildCheckPointValue(checkPoint.topicId(), checkPoint.queueId(), checkPoint.messgeOffset(), + checkPoint.consumerGroupId(), checkPoint.operationId(), checkPoint.isOrder(), + checkPoint.deliveryTimestamp(), nextInvisibleTimestamp, checkPoint.reconsumeCount())); + + BatchWriteRequest writeTimerTagRequest = new BatchWriteRequest(KV_PARTITION_TIMER_TAG, + buildTimerTagKey(nextInvisibleTimestamp, checkPoint.topicId(), checkPoint.queueId(), checkPoint.messgeOffset()), new byte[0]); + + kvService.batch(deleteLastTimerTagRequest, writeCheckPointRequest, writeTimerTagRequest); + } catch (RocksDBException e) { + // TODO: handle exception + throw new RuntimeException(e); + } return null; } diff --git a/store/src/main/java/com/automq/rocketmq/store/model/kv/BatchDeleteRequest.java b/store/src/main/java/com/automq/rocketmq/store/model/kv/BatchDeleteRequest.java new file mode 100644 index 000000000..8d9349904 --- /dev/null +++ b/store/src/main/java/com/automq/rocketmq/store/model/kv/BatchDeleteRequest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.store.model.kv; + +public record BatchDeleteRequest(String partition, byte[] key) implements BatchRequest { + @Override + public byte[] value() { + return null; + } + + @Override + public BatchRequestType type() { + return BatchRequestType.DELETE; + } +} diff --git a/store/src/main/java/com/automq/rocketmq/store/model/kv/BatchRequest.java b/store/src/main/java/com/automq/rocketmq/store/model/kv/BatchRequest.java new file mode 100644 index 000000000..94bf2e15f --- /dev/null +++ b/store/src/main/java/com/automq/rocketmq/store/model/kv/BatchRequest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.store.model.kv; + +public interface BatchRequest { + String partition(); + + byte[] key(); + + byte[] value(); + + BatchRequestType type(); +} diff --git a/store/src/main/java/com/automq/rocketmq/store/model/kv/BatchRequestType.java b/store/src/main/java/com/automq/rocketmq/store/model/kv/BatchRequestType.java new file mode 100644 index 000000000..3bb6e1997 --- /dev/null +++ b/store/src/main/java/com/automq/rocketmq/store/model/kv/BatchRequestType.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.store.model.kv; + +public enum BatchRequestType { + WRITE, + DELETE, +} diff --git a/store/src/main/java/com/automq/rocketmq/store/model/kv/BatchWriteRequest.java b/store/src/main/java/com/automq/rocketmq/store/model/kv/BatchWriteRequest.java index e8e97a486..c63e35622 100644 --- a/store/src/main/java/com/automq/rocketmq/store/model/kv/BatchWriteRequest.java +++ b/store/src/main/java/com/automq/rocketmq/store/model/kv/BatchWriteRequest.java @@ -17,5 +17,9 @@ package com.automq.rocketmq.store.model.kv; -public record BatchWriteRequest(String partition, byte[] key, byte[] value) { +public record BatchWriteRequest(String partition, byte[] key, byte[] value) implements BatchRequest { + @Override + public BatchRequestType type() { + return BatchRequestType.WRITE; + } } diff --git a/store/src/main/java/com/automq/rocketmq/store/model/message/PopResult.java b/store/src/main/java/com/automq/rocketmq/store/model/message/PopResult.java index 2024d716b..0a0aade53 100644 --- a/store/src/main/java/com/automq/rocketmq/store/model/message/PopResult.java +++ b/store/src/main/java/com/automq/rocketmq/store/model/message/PopResult.java @@ -20,5 +20,5 @@ import com.automq.rocketmq.common.model.Message; import java.util.List; -public record PopResult(int status, long deliveryTimestamp, List messageList) { +public record PopResult(int status, long operationId, long deliveryTimestamp, List messageList) { } diff --git a/store/src/main/java/com/automq/rocketmq/store/service/KVService.java b/store/src/main/java/com/automq/rocketmq/store/service/KVService.java index 049e1257c..d742fb00d 100644 --- a/store/src/main/java/com/automq/rocketmq/store/service/KVService.java +++ b/store/src/main/java/com/automq/rocketmq/store/service/KVService.java @@ -17,7 +17,7 @@ package com.automq.rocketmq.store.service; -import com.automq.rocketmq.store.model.kv.BatchWriteRequest; +import com.automq.rocketmq.store.model.kv.BatchRequest; import com.automq.rocketmq.store.model.kv.IteratorCallback; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -75,13 +75,12 @@ void iterate(final String partition, String prefix, final String start, void put(final String partition, byte[] key, byte[] value) throws RocksDBException; /** - * Put the kv pair into the backend engine. - * The kv pair will be written into the backend engine in batch. + * Put or delete the kv pair in batch. * - * @param writeRequests the batch builder + * @param requests the mutation requests * @throws RocksDBException if backend engine fails */ - void writeBatch(BatchWriteRequest... writeRequests) throws RocksDBException; + void batch(BatchRequest... requests) throws RocksDBException; /** * Delete value with specified key from backend kv engine. diff --git a/store/src/main/java/com/automq/rocketmq/store/service/RocksDBKVService.java b/store/src/main/java/com/automq/rocketmq/store/service/RocksDBKVService.java index fd590de80..fa9aaedf3 100644 --- a/store/src/main/java/com/automq/rocketmq/store/service/RocksDBKVService.java +++ b/store/src/main/java/com/automq/rocketmq/store/service/RocksDBKVService.java @@ -17,7 +17,7 @@ package com.automq.rocketmq.store.service; -import com.automq.rocketmq.store.model.kv.BatchWriteRequest; +import com.automq.rocketmq.store.model.kv.BatchRequest; import com.automq.rocketmq.store.model.kv.IteratorCallback; import com.google.common.base.Strings; import java.io.File; @@ -236,14 +236,23 @@ public void put(final String partition, byte[] key, byte[] value) throws RocksDB } @Override - public void writeBatch(BatchWriteRequest... writeRequests) throws RocksDBException { + public void batch(BatchRequest... requests) throws RocksDBException { if (stopped) { throw new RocksDBException("KV service is stopped."); } + + if (requests == null) { + throw new RocksDBException("The requests can not be null."); + } + try (WriteOptions writeOptions = new WriteOptions(); WriteBatch writeBatch = new WriteBatch()) { - for (BatchWriteRequest request : writeRequests) { + for (BatchRequest request : requests) { ColumnFamilyHandle handle = getOrCreateColumnFamily(request.partition()); - writeBatch.put(handle, request.key(), request.value()); + switch (request.type()) { + case WRITE -> writeBatch.put(handle, request.key(), request.value()); + case DELETE -> writeBatch.delete(handle, request.key()); + default -> throw new RocksDBException("Unsupported request type: " + request.type()); + } } rocksDB.write(writeOptions, writeBatch); } diff --git a/store/src/test/java/com/automq/rocketmq/store/KVServiceTest.java b/store/src/test/java/com/automq/rocketmq/store/KVServiceTest.java index 210e8a554..a12d524b0 100644 --- a/store/src/test/java/com/automq/rocketmq/store/KVServiceTest.java +++ b/store/src/test/java/com/automq/rocketmq/store/KVServiceTest.java @@ -17,6 +17,8 @@ package com.automq.rocketmq.store; +import com.automq.rocketmq.store.model.kv.BatchDeleteRequest; +import com.automq.rocketmq.store.model.kv.BatchWriteRequest; import com.automq.rocketmq.store.service.KVService; import com.automq.rocketmq.store.service.RocksDBKVService; import java.io.File; @@ -164,5 +166,41 @@ public void testIterate() throws IOException, RocksDBException { store.destroy(); } + + @Test + public void testBatch() throws IOException, RocksDBException { + String path = new File(PATH + UUID.randomUUID()).getCanonicalPath(); + cleanUp(path); + KVService store = new RocksDBKVService(path); + assertNotNull(store); + + assertThrowsExactly(RocksDBException.class, () -> store.batch(null)); + store.batch(new BatchWriteRequest(PARTITION, "0".getBytes(), "0".getBytes()), new BatchWriteRequest(PARTITION, "1".getBytes(), "1".getBytes())); + + AtomicInteger num = new AtomicInteger(); + store.iterate(PARTITION, (key, value) -> { + String valueStr = new String(value, KVService.CHARSET); + String target = String.valueOf(num.getAndIncrement()); + + assertEquals(target, valueStr); + }); + assertEquals(2, num.get()); + + store.batch(new BatchDeleteRequest(PARTITION, "0".getBytes()), new BatchWriteRequest(PARTITION, "2".getBytes(), "2".getBytes())); + + num.set(1); + store.iterate(PARTITION, (key, value) -> { + String valueStr = new String(value, KVService.CHARSET); + String target = String.valueOf(num.getAndIncrement()); + + assertEquals(target, valueStr); + }); + assertEquals(3, num.get()); + + store.batch(new BatchDeleteRequest(PARTITION, "1".getBytes()), new BatchDeleteRequest(PARTITION, "2".getBytes())); + num.set(0); + store.iterate(PARTITION, (key, value) -> num.getAndIncrement()); + assertEquals(0, num.get()); + } } diff --git a/store/src/test/java/com/automq/rocketmq/store/impl/MessageStoreTest.java b/store/src/test/java/com/automq/rocketmq/store/impl/MessageStoreTest.java index 21518b8ed..410a84c23 100644 --- a/store/src/test/java/com/automq/rocketmq/store/impl/MessageStoreTest.java +++ b/store/src/test/java/com/automq/rocketmq/store/impl/MessageStoreTest.java @@ -26,8 +26,9 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.rocksdb.RocksDBException; @@ -42,47 +43,44 @@ class MessageStoreTest { private static KVService kvService; private static MessageStore messageStore; - @BeforeAll - public static void setUp() throws RocksDBException { + @BeforeEach + public void setUp() throws RocksDBException { kvService = new RocksDBKVService(PATH); messageStore = new MessageStoreImpl(null, kvService); } - @AfterAll - public static void tearDown() throws RocksDBException { + @AfterEach + public void tearDown() throws RocksDBException { kvService.destroy(); } @Test void pop() throws RocksDBException { long testStartTime = System.nanoTime(); - PopResult popResult = messageStore.pop(1, 1, 1, 0, 32, false, 100); + PopResult popResult = messageStore.pop(1, 1, 1, 0, 1, false, 100); assertEquals(0, popResult.status()); assertFalse(popResult.messageList().isEmpty()); - for (Message message : popResult.messageList()) { - byte[] bytes = kvService.get(MessageStoreImpl.KV_PARTITION_CHECK_POINT, MessageStoreImpl.buildCheckPointKey(1, 1, 1, message.offset())); - assertNotNull(bytes); + Message message = popResult.messageList().get(0); + byte[] bytes = kvService.get(MessageStoreImpl.KV_PARTITION_CHECK_POINT, MessageStoreImpl.buildCheckPointKey(1, 1, 0, popResult.operationId())); + assertNotNull(bytes); - CheckPoint checkPoint = CheckPoint.getRootAsCheckPoint(ByteBuffer.wrap(bytes)); - assertTrue(testStartTime < popResult.deliveryTimestamp()); - assertEquals(popResult.deliveryTimestamp(), checkPoint.deliveryTimestamp()); - assertEquals(100, checkPoint.invisibleDuration()); - assertEquals(1, checkPoint.consumerGroupId()); - assertEquals(1, checkPoint.topicId()); - assertEquals(1, checkPoint.queueId()); - assertEquals(message.offset(), checkPoint.offset()); - } + CheckPoint checkPoint = CheckPoint.getRootAsCheckPoint(ByteBuffer.wrap(bytes)); + assertTrue(testStartTime < popResult.deliveryTimestamp()); + assertEquals(popResult.deliveryTimestamp(), checkPoint.deliveryTimestamp()); + assertEquals(popResult.deliveryTimestamp() + 100, checkPoint.nextVisibleTimestamp()); + assertEquals(1, checkPoint.consumerGroupId()); + assertEquals(1, checkPoint.topicId()); + assertEquals(1, checkPoint.queueId()); + assertEquals(message.offset(), checkPoint.messgeOffset()); - messageStore.pop(2, 2, 2, 0, 32, false, 100); - messageStore.pop(1, 2, 2, 0, 32, false, 100); - messageStore.pop(1, 1, 2, 0, 32, false, 100); + messageStore.pop(1, 1, 2, 0, 1, false, 100); List allCheckPointList = new ArrayList<>(); kvService.iterate(MessageStoreImpl.KV_PARTITION_CHECK_POINT, (key, value) -> allCheckPointList.add(CheckPoint.getRootAsCheckPoint(ByteBuffer.wrap(value)))); - assertEquals(4, allCheckPointList.size()); + assertEquals(2, allCheckPointList.size()); assertEquals(1, allCheckPointList.get(0).consumerGroupId()); assertEquals(1, allCheckPointList.get(0).topicId()); @@ -91,13 +89,123 @@ void pop() throws RocksDBException { assertEquals(1, allCheckPointList.get(1).consumerGroupId()); assertEquals(1, allCheckPointList.get(1).topicId()); assertEquals(2, allCheckPointList.get(1).queueId()); + } + + @Test + void popOrderly() throws RocksDBException { + PopResult popResult = messageStore.pop(1, 1, 1, 0, 1, true, 100); + assertEquals(0, popResult.status()); + assertFalse(popResult.messageList().isEmpty()); + + Message message = popResult.messageList().get(0); + byte[] bytes = kvService.get(MessageStoreImpl.KV_PARTITION_CHECK_POINT, MessageStoreImpl.buildCheckPointKey(1, 1, message.offset(), popResult.operationId())); + assertNotNull(bytes); + + CheckPoint checkPoint = CheckPoint.getRootAsCheckPoint(ByteBuffer.wrap(bytes)); + assertTrue(checkPoint.isOrder()); + assertEquals(popResult.operationId(), checkPoint.operationId()); + assertEquals(popResult.deliveryTimestamp(), checkPoint.deliveryTimestamp()); + assertEquals(popResult.deliveryTimestamp() + 100, checkPoint.nextVisibleTimestamp()); + assertEquals(0, checkPoint.reconsumeCount()); + + bytes = kvService.get(MessageStoreImpl.KV_PARTITION_ORDER_INDEX, MessageStoreImpl.buildOrderIndexKey(1, 1, 1, message.offset())); + assertNotNull(bytes); + + assertEquals(popResult.operationId(), ByteBuffer.wrap(bytes).getLong()); + + // Pop the same message again. + popResult = messageStore.pop(1, 1, 1, 0, 1, true, 100); + + message = popResult.messageList().get(0); + bytes = kvService.get(MessageStoreImpl.KV_PARTITION_CHECK_POINT, MessageStoreImpl.buildCheckPointKey(1, 1, message.offset(), popResult.operationId())); + assertNotNull(bytes); + + checkPoint = CheckPoint.getRootAsCheckPoint(ByteBuffer.wrap(bytes)); + assertTrue(checkPoint.isOrder()); + assertEquals(popResult.operationId(), checkPoint.operationId()); + assertEquals(popResult.deliveryTimestamp(), checkPoint.deliveryTimestamp()); + assertEquals(popResult.deliveryTimestamp() + 100, checkPoint.nextVisibleTimestamp()); + assertEquals(1, checkPoint.reconsumeCount()); + + bytes = kvService.get(MessageStoreImpl.KV_PARTITION_ORDER_INDEX, MessageStoreImpl.buildOrderIndexKey(1, 1, 1, message.offset())); + assertNotNull(bytes); + + assertEquals(popResult.operationId(), ByteBuffer.wrap(bytes).getLong()); + + AtomicInteger checkPointCount = new AtomicInteger(); + kvService.iterate(MessageStoreImpl.KV_PARTITION_CHECK_POINT, (key, value) -> checkPointCount.getAndIncrement()); + assertEquals(1, checkPointCount.get()); + + AtomicInteger orderIndexCount = new AtomicInteger(); + kvService.iterate(MessageStoreImpl.KV_PARTITION_ORDER_INDEX, (key, value) -> orderIndexCount.getAndIncrement()); + assertEquals(1, orderIndexCount.get()); + + AtomicInteger timerTagCount = new AtomicInteger(); + kvService.iterate(MessageStoreImpl.KV_PARTITION_TIMER_TAG, (key, value) -> timerTagCount.getAndIncrement()); + assertEquals(1, timerTagCount.get()); + } + + @Test + void ack() throws RocksDBException { + PopResult popResult = messageStore.pop(1, 1, 1, 0, 1, true, 100); + assertEquals(0, popResult.status()); + assertFalse(popResult.messageList().isEmpty()); + + Message message = popResult.messageList().get(0); + messageStore.ack(MessageStoreImpl.encodeReceiptHandle(1, 1, message.offset(), popResult.operationId())); + + AtomicInteger checkPointCount = new AtomicInteger(); + kvService.iterate(MessageStoreImpl.KV_PARTITION_CHECK_POINT, (key, value) -> checkPointCount.getAndIncrement()); + assertEquals(0, checkPointCount.get()); + + AtomicInteger orderIndexCount = new AtomicInteger(); + kvService.iterate(MessageStoreImpl.KV_PARTITION_ORDER_INDEX, (key, value) -> orderIndexCount.getAndIncrement()); + assertEquals(0, orderIndexCount.get()); + + AtomicInteger timerTagCount = new AtomicInteger(); + kvService.iterate(MessageStoreImpl.KV_PARTITION_TIMER_TAG, (key, value) -> timerTagCount.getAndIncrement()); + assertEquals(0, timerTagCount.get()); + } + + @Test + void changeInvisibleDuration() throws RocksDBException { + // Pop the message to generate the check point. + PopResult popResult = messageStore.pop(1, 1, 1, 0, 1, false, 100); + assertEquals(0, popResult.status()); + assertFalse(popResult.messageList().isEmpty()); + + byte[] checkPointKey = MessageStoreImpl.buildCheckPointKey(1, 1, 0, popResult.operationId()); + byte[] bytes = kvService.get(MessageStoreImpl.KV_PARTITION_CHECK_POINT, checkPointKey); + assertNotNull(bytes); + + CheckPoint checkPoint = CheckPoint.getRootAsCheckPoint(ByteBuffer.wrap(bytes)); + + long popEndTimestamp = System.nanoTime(); + long lastVisibleTime = checkPoint.nextVisibleTimestamp(); + assertTrue(popEndTimestamp + 100 > checkPoint.nextVisibleTimestamp()); + + // Change the invisible duration. + Message message = popResult.messageList().get(0); + String receiptHandle = MessageStoreImpl.encodeReceiptHandle(1, 1, message.offset(), popResult.operationId()); + messageStore.changeInvisibleDuration(receiptHandle, 100_000_000_000L); + + bytes = kvService.get(MessageStoreImpl.KV_PARTITION_CHECK_POINT, checkPointKey); + assertNotNull(bytes); + + checkPoint = CheckPoint.getRootAsCheckPoint(ByteBuffer.wrap(bytes)); + + assertTrue(lastVisibleTime < checkPoint.nextVisibleTimestamp()); + assertTrue(popEndTimestamp + 100 < checkPoint.nextVisibleTimestamp()); + + // Ack the message with the same receipt handle. + messageStore.ack(receiptHandle); - assertEquals(1, allCheckPointList.get(2).consumerGroupId()); - assertEquals(2, allCheckPointList.get(2).topicId()); - assertEquals(2, allCheckPointList.get(2).queueId()); + AtomicInteger checkPointCount = new AtomicInteger(); + kvService.iterate(MessageStoreImpl.KV_PARTITION_CHECK_POINT, (key, value) -> checkPointCount.getAndIncrement()); + assertEquals(0, checkPointCount.get()); - assertEquals(2, allCheckPointList.get(3).consumerGroupId()); - assertEquals(2, allCheckPointList.get(3).topicId()); - assertEquals(2, allCheckPointList.get(3).queueId()); + AtomicInteger timerTagCount = new AtomicInteger(); + kvService.iterate(MessageStoreImpl.KV_PARTITION_TIMER_TAG, (key, value) -> timerTagCount.getAndIncrement()); + assertEquals(0, timerTagCount.get()); } } \ No newline at end of file