From 7f8b85593289b07568bb1f522762891483d6fce0 Mon Sep 17 00:00:00 2001 From: SSpirits Date: Thu, 14 Sep 2023 19:49:29 +0800 Subject: [PATCH] feat(store): write check point and timer tag to rocksdb atomically Signed-off-by: SSpirits --- store/pom.xml | 2 +- .../rocketmq/store/impl/MessageStoreImpl.java | 9 +++-- .../store/model/kv/BatchWriteRequest.java | 21 ++++++++++ .../IteratorCallback.java} | 4 +- .../rocketmq/store/service/KVService.java | 20 +++++++--- .../store/service/RocksDBKVService.java | 23 +++++++++-- .../rocketmq/store/impl/MessageStoreTest.java | 38 ++++++++++++++++++- 7 files changed, 100 insertions(+), 17 deletions(-) create mode 100644 store/src/main/java/com/automq/rocketmq/store/model/kv/BatchWriteRequest.java rename store/src/main/java/com/automq/rocketmq/store/model/{callback/KVIteratorCallback.java => kv/IteratorCallback.java} (92%) diff --git a/store/pom.xml b/store/pom.xml index 43b5670d4..36ed2dfca 100644 --- a/store/pom.xml +++ b/store/pom.xml @@ -84,7 +84,7 @@ org.codehaus.mojo build-helper-maven-plugin - 3.1.0 + 3.2.0 add-source diff --git a/store/src/main/java/com/automq/rocketmq/store/impl/MessageStoreImpl.java b/store/src/main/java/com/automq/rocketmq/store/impl/MessageStoreImpl.java index 425c73d87..f7816cf93 100644 --- a/store/src/main/java/com/automq/rocketmq/store/impl/MessageStoreImpl.java +++ b/store/src/main/java/com/automq/rocketmq/store/impl/MessageStoreImpl.java @@ -21,6 +21,7 @@ import com.automq.rocketmq.store.MessageStore; import com.automq.rocketmq.store.StreamStore; import com.automq.rocketmq.store.model.generated.CheckPoint; +import com.automq.rocketmq.store.model.kv.BatchWriteRequest; import com.automq.rocketmq.store.model.message.AckResult; import com.automq.rocketmq.store.model.message.ChangeInvisibleDurationResult; import com.automq.rocketmq.store.model.message.PopResult; @@ -95,13 +96,13 @@ public PopResult pop(long consumeGroupId, long topicId, int queueId, long offset // insert check point and timer tag into KVService messageList.forEach(message -> { try { - // put check point - kvService.put(KV_PARTITION_CHECK_POINT, + BatchWriteRequest writeCheckPointRequest = new BatchWriteRequest(KV_PARTITION_CHECK_POINT, buildCheckPointKey(consumeGroupId, topicId, queueId, message.offset()), buildCheckPointValue(serialNumber, deliveryTimestamp, invisibleDuration, topicId, queueId, message.offset(), consumeGroupId)); - // put timer tag - kvService.put(KV_PARTITION_TIMER_TAG, + BatchWriteRequest writeTimerTagRequest = new BatchWriteRequest(KV_PARTITION_TIMER_TAG, buildTimerTagKey(invisibleDuration, consumeGroupId, topicId, queueId, message.offset()), new byte[0]); + // write check point and timer tag to KV service atomically + kvService.writeBatch(writeCheckPointRequest, writeTimerTagRequest); } catch (RocksDBException e) { throw new RuntimeException(e); } diff --git a/store/src/main/java/com/automq/rocketmq/store/model/kv/BatchWriteRequest.java b/store/src/main/java/com/automq/rocketmq/store/model/kv/BatchWriteRequest.java new file mode 100644 index 000000000..e8e97a486 --- /dev/null +++ b/store/src/main/java/com/automq/rocketmq/store/model/kv/BatchWriteRequest.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.store.model.kv; + +public record BatchWriteRequest(String partition, byte[] key, byte[] value) { +} diff --git a/store/src/main/java/com/automq/rocketmq/store/model/callback/KVIteratorCallback.java b/store/src/main/java/com/automq/rocketmq/store/model/kv/IteratorCallback.java similarity index 92% rename from store/src/main/java/com/automq/rocketmq/store/model/callback/KVIteratorCallback.java rename to store/src/main/java/com/automq/rocketmq/store/model/kv/IteratorCallback.java index 0a75c2f65..dce2fc76b 100644 --- a/store/src/main/java/com/automq/rocketmq/store/model/callback/KVIteratorCallback.java +++ b/store/src/main/java/com/automq/rocketmq/store/model/kv/IteratorCallback.java @@ -15,12 +15,12 @@ * limitations under the License. */ -package com.automq.rocketmq.store.model.callback; +package com.automq.rocketmq.store.model.kv; /** * Callback to execute once a key-value pair is read from KV store. */ -public interface KVIteratorCallback { +public interface IteratorCallback { /** * Callback to execute once a key-value pair is read from KV store. * diff --git a/store/src/main/java/com/automq/rocketmq/store/service/KVService.java b/store/src/main/java/com/automq/rocketmq/store/service/KVService.java index 7d5eab787..049e1257c 100644 --- a/store/src/main/java/com/automq/rocketmq/store/service/KVService.java +++ b/store/src/main/java/com/automq/rocketmq/store/service/KVService.java @@ -17,7 +17,8 @@ package com.automq.rocketmq.store.service; -import com.automq.rocketmq.store.model.callback.KVIteratorCallback; +import com.automq.rocketmq.store.model.kv.BatchWriteRequest; +import com.automq.rocketmq.store.model.kv.IteratorCallback; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import org.rocksdb.RocksDBException; @@ -41,10 +42,10 @@ public interface KVService { * Iterate all the k-v pairs. * * @param partition the partition storing required the k-v pair - * @param callback the iterator will call {@link KVIteratorCallback#onRead} to consume the kv pair + * @param callback the iterator will call {@link IteratorCallback#onRead} to consume the kv pair * @throws RocksDBException if backend engine fails */ - void iterate(final String partition, KVIteratorCallback callback) throws RocksDBException; + void iterate(final String partition, IteratorCallback callback) throws RocksDBException; /** * Iterate the k-v pair with the given prefix, start and end. @@ -57,11 +58,11 @@ public interface KVService { * @param prefix iterate the kv pair with the specified prefix * @param start the lower bound to start iterate * @param end the upper bound to end iterate - * @param callback the iterator will call {@link KVIteratorCallback#onRead} to consume the kv pair + * @param callback the iterator will call {@link IteratorCallback#onRead} to consume the kv pair * @throws RocksDBException if backend engine fails */ void iterate(final String partition, String prefix, final String start, - final String end, KVIteratorCallback callback) throws RocksDBException; + final String end, IteratorCallback callback) throws RocksDBException; /** * Put the kv pair into the backend engine. @@ -73,6 +74,15 @@ void iterate(final String partition, String prefix, final String start, */ void put(final String partition, byte[] key, byte[] value) throws RocksDBException; + /** + * Put the kv pair into the backend engine. + * The kv pair will be written into the backend engine in batch. + * + * @param writeRequests the batch builder + * @throws RocksDBException if backend engine fails + */ + void writeBatch(BatchWriteRequest... writeRequests) throws RocksDBException; + /** * Delete value with specified key from backend kv engine. * diff --git a/store/src/main/java/com/automq/rocketmq/store/service/RocksDBKVService.java b/store/src/main/java/com/automq/rocketmq/store/service/RocksDBKVService.java index 60dad91b3..fd590de80 100644 --- a/store/src/main/java/com/automq/rocketmq/store/service/RocksDBKVService.java +++ b/store/src/main/java/com/automq/rocketmq/store/service/RocksDBKVService.java @@ -17,7 +17,8 @@ package com.automq.rocketmq.store.service; -import com.automq.rocketmq.store.model.callback.KVIteratorCallback; +import com.automq.rocketmq.store.model.kv.BatchWriteRequest; +import com.automq.rocketmq.store.model.kv.IteratorCallback; import com.google.common.base.Strings; import java.io.File; import java.util.ArrayList; @@ -35,6 +36,8 @@ import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; import org.rocksdb.Slice; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; public class RocksDBKVService implements KVService { private final String path; @@ -102,7 +105,7 @@ private boolean checkPrefix(byte[] key, byte[] upperBound) { } @Override - public void iterate(final String partition, KVIteratorCallback callback) throws RocksDBException { + public void iterate(final String partition, IteratorCallback callback) throws RocksDBException { if (stopped) { throw new RocksDBException("KV service is stopped."); } @@ -125,7 +128,7 @@ public void iterate(final String partition, KVIteratorCallback callback) throws @Override public void iterate(final String partition, String prefix, final String start, - final String end, KVIteratorCallback callback) throws RocksDBException { + final String end, IteratorCallback callback) throws RocksDBException { if (stopped) { throw new RocksDBException("KV service is stopped."); } @@ -232,6 +235,20 @@ public void put(final String partition, byte[] key, byte[] value) throws RocksDB rocksDB.put(handle, key, value); } + @Override + public void writeBatch(BatchWriteRequest... writeRequests) throws RocksDBException { + if (stopped) { + throw new RocksDBException("KV service is stopped."); + } + try (WriteOptions writeOptions = new WriteOptions(); WriteBatch writeBatch = new WriteBatch()) { + for (BatchWriteRequest request : writeRequests) { + ColumnFamilyHandle handle = getOrCreateColumnFamily(request.partition()); + writeBatch.put(handle, request.key(), request.value()); + } + rocksDB.write(writeOptions, writeBatch); + } + } + @Override public void delete(final String partition, byte[] key) throws RocksDBException { if (stopped) { diff --git a/store/src/test/java/com/automq/rocketmq/store/impl/MessageStoreTest.java b/store/src/test/java/com/automq/rocketmq/store/impl/MessageStoreTest.java index c82ce3753..21518b8ed 100644 --- a/store/src/test/java/com/automq/rocketmq/store/impl/MessageStoreTest.java +++ b/store/src/test/java/com/automq/rocketmq/store/impl/MessageStoreTest.java @@ -24,6 +24,9 @@ import com.automq.rocketmq.store.service.KVService; import com.automq.rocketmq.store.service.RocksDBKVService; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.rocksdb.RocksDBException; @@ -40,15 +43,20 @@ class MessageStoreTest { private static MessageStore messageStore; @BeforeAll - private static void setUp() throws RocksDBException { + public static void setUp() throws RocksDBException { kvService = new RocksDBKVService(PATH); messageStore = new MessageStoreImpl(null, kvService); } + @AfterAll + public static void tearDown() throws RocksDBException { + kvService.destroy(); + } + @Test void pop() throws RocksDBException { long testStartTime = System.nanoTime(); - PopResult popResult = messageStore.pop(1, 1, 1, 1, 32, false, 100); + PopResult popResult = messageStore.pop(1, 1, 1, 0, 32, false, 100); assertEquals(0, popResult.status()); assertFalse(popResult.messageList().isEmpty()); @@ -65,5 +73,31 @@ void pop() throws RocksDBException { assertEquals(1, checkPoint.queueId()); assertEquals(message.offset(), checkPoint.offset()); } + + messageStore.pop(2, 2, 2, 0, 32, false, 100); + messageStore.pop(1, 2, 2, 0, 32, false, 100); + messageStore.pop(1, 1, 2, 0, 32, false, 100); + + List allCheckPointList = new ArrayList<>(); + kvService.iterate(MessageStoreImpl.KV_PARTITION_CHECK_POINT, (key, value) -> + allCheckPointList.add(CheckPoint.getRootAsCheckPoint(ByteBuffer.wrap(value)))); + + assertEquals(4, allCheckPointList.size()); + + assertEquals(1, allCheckPointList.get(0).consumerGroupId()); + assertEquals(1, allCheckPointList.get(0).topicId()); + assertEquals(1, allCheckPointList.get(0).queueId()); + + assertEquals(1, allCheckPointList.get(1).consumerGroupId()); + assertEquals(1, allCheckPointList.get(1).topicId()); + assertEquals(2, allCheckPointList.get(1).queueId()); + + assertEquals(1, allCheckPointList.get(2).consumerGroupId()); + assertEquals(2, allCheckPointList.get(2).topicId()); + assertEquals(2, allCheckPointList.get(2).queueId()); + + assertEquals(2, allCheckPointList.get(3).consumerGroupId()); + assertEquals(2, allCheckPointList.get(3).topicId()); + assertEquals(2, allCheckPointList.get(3).queueId()); } } \ No newline at end of file