Skip to content

Commit

Permalink
feat(store): write check point and timer tag to rocksdb atomically
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits committed Sep 14, 2023
1 parent fbad681 commit 7f8b855
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 17 deletions.
2 changes: 1 addition & 1 deletion store/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.1.0</version>
<version>3.2.0</version>
<executions>
<execution>
<id>add-source</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.automq.rocketmq.store.MessageStore;
import com.automq.rocketmq.store.StreamStore;
import com.automq.rocketmq.store.model.generated.CheckPoint;
import com.automq.rocketmq.store.model.kv.BatchWriteRequest;
import com.automq.rocketmq.store.model.message.AckResult;
import com.automq.rocketmq.store.model.message.ChangeInvisibleDurationResult;
import com.automq.rocketmq.store.model.message.PopResult;
Expand Down Expand Up @@ -95,13 +96,13 @@ public PopResult pop(long consumeGroupId, long topicId, int queueId, long offset
// insert check point and timer tag into KVService
messageList.forEach(message -> {
try {
// put check point
kvService.put(KV_PARTITION_CHECK_POINT,
BatchWriteRequest writeCheckPointRequest = new BatchWriteRequest(KV_PARTITION_CHECK_POINT,
buildCheckPointKey(consumeGroupId, topicId, queueId, message.offset()),
buildCheckPointValue(serialNumber, deliveryTimestamp, invisibleDuration, topicId, queueId, message.offset(), consumeGroupId));
// put timer tag
kvService.put(KV_PARTITION_TIMER_TAG,
BatchWriteRequest writeTimerTagRequest = new BatchWriteRequest(KV_PARTITION_TIMER_TAG,
buildTimerTagKey(invisibleDuration, consumeGroupId, topicId, queueId, message.offset()), new byte[0]);
// write check point and timer tag to KV service atomically
kvService.writeBatch(writeCheckPointRequest, writeTimerTagRequest);
} catch (RocksDBException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.store.model.kv;

public record BatchWriteRequest(String partition, byte[] key, byte[] value) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
* limitations under the License.
*/

package com.automq.rocketmq.store.model.callback;
package com.automq.rocketmq.store.model.kv;

/**
* Callback to execute once a key-value pair is read from KV store.
*/
public interface KVIteratorCallback {
public interface IteratorCallback {
/**
* Callback to execute once a key-value pair is read from KV store.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package com.automq.rocketmq.store.service;

import com.automq.rocketmq.store.model.callback.KVIteratorCallback;
import com.automq.rocketmq.store.model.kv.BatchWriteRequest;
import com.automq.rocketmq.store.model.kv.IteratorCallback;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import org.rocksdb.RocksDBException;
Expand All @@ -41,10 +42,10 @@ public interface KVService {
* Iterate all the k-v pairs.
*
* @param partition the partition storing required the k-v pair
* @param callback the iterator will call {@link KVIteratorCallback#onRead} to consume the kv pair
* @param callback the iterator will call {@link IteratorCallback#onRead} to consume the kv pair
* @throws RocksDBException if backend engine fails
*/
void iterate(final String partition, KVIteratorCallback callback) throws RocksDBException;
void iterate(final String partition, IteratorCallback callback) throws RocksDBException;

/**
* Iterate the k-v pair with the given prefix, start and end.
Expand All @@ -57,11 +58,11 @@ public interface KVService {
* @param prefix iterate the kv pair with the specified prefix
* @param start the lower bound to start iterate
* @param end the upper bound to end iterate
* @param callback the iterator will call {@link KVIteratorCallback#onRead} to consume the kv pair
* @param callback the iterator will call {@link IteratorCallback#onRead} to consume the kv pair
* @throws RocksDBException if backend engine fails
*/
void iterate(final String partition, String prefix, final String start,
final String end, KVIteratorCallback callback) throws RocksDBException;
final String end, IteratorCallback callback) throws RocksDBException;

/**
* Put the kv pair into the backend engine.
Expand All @@ -73,6 +74,15 @@ void iterate(final String partition, String prefix, final String start,
*/
void put(final String partition, byte[] key, byte[] value) throws RocksDBException;

/**
* Put the kv pair into the backend engine.
* The kv pair will be written into the backend engine in batch.
*
* @param writeRequests the batch builder
* @throws RocksDBException if backend engine fails
*/
void writeBatch(BatchWriteRequest... writeRequests) throws RocksDBException;

/**
* Delete value with specified key from backend kv engine.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package com.automq.rocketmq.store.service;

import com.automq.rocketmq.store.model.callback.KVIteratorCallback;
import com.automq.rocketmq.store.model.kv.BatchWriteRequest;
import com.automq.rocketmq.store.model.kv.IteratorCallback;
import com.google.common.base.Strings;
import java.io.File;
import java.util.ArrayList;
Expand All @@ -35,6 +36,8 @@
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Slice;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;

public class RocksDBKVService implements KVService {
private final String path;
Expand Down Expand Up @@ -102,7 +105,7 @@ private boolean checkPrefix(byte[] key, byte[] upperBound) {
}

@Override
public void iterate(final String partition, KVIteratorCallback callback) throws RocksDBException {
public void iterate(final String partition, IteratorCallback callback) throws RocksDBException {
if (stopped) {
throw new RocksDBException("KV service is stopped.");
}
Expand All @@ -125,7 +128,7 @@ public void iterate(final String partition, KVIteratorCallback callback) throws

@Override
public void iterate(final String partition, String prefix, final String start,
final String end, KVIteratorCallback callback) throws RocksDBException {
final String end, IteratorCallback callback) throws RocksDBException {
if (stopped) {
throw new RocksDBException("KV service is stopped.");
}
Expand Down Expand Up @@ -232,6 +235,20 @@ public void put(final String partition, byte[] key, byte[] value) throws RocksDB
rocksDB.put(handle, key, value);
}

@Override
public void writeBatch(BatchWriteRequest... writeRequests) throws RocksDBException {
if (stopped) {
throw new RocksDBException("KV service is stopped.");
}
try (WriteOptions writeOptions = new WriteOptions(); WriteBatch writeBatch = new WriteBatch()) {
for (BatchWriteRequest request : writeRequests) {
ColumnFamilyHandle handle = getOrCreateColumnFamily(request.partition());
writeBatch.put(handle, request.key(), request.value());
}
rocksDB.write(writeOptions, writeBatch);
}
}

@Override
public void delete(final String partition, byte[] key) throws RocksDBException {
if (stopped) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import com.automq.rocketmq.store.service.KVService;
import com.automq.rocketmq.store.service.RocksDBKVService;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.rocksdb.RocksDBException;
Expand All @@ -40,15 +43,20 @@ class MessageStoreTest {
private static MessageStore messageStore;

@BeforeAll
private static void setUp() throws RocksDBException {
public static void setUp() throws RocksDBException {
kvService = new RocksDBKVService(PATH);
messageStore = new MessageStoreImpl(null, kvService);
}

@AfterAll
public static void tearDown() throws RocksDBException {
kvService.destroy();
}

@Test
void pop() throws RocksDBException {
long testStartTime = System.nanoTime();
PopResult popResult = messageStore.pop(1, 1, 1, 1, 32, false, 100);
PopResult popResult = messageStore.pop(1, 1, 1, 0, 32, false, 100);
assertEquals(0, popResult.status());
assertFalse(popResult.messageList().isEmpty());

Expand All @@ -65,5 +73,31 @@ void pop() throws RocksDBException {
assertEquals(1, checkPoint.queueId());
assertEquals(message.offset(), checkPoint.offset());
}

messageStore.pop(2, 2, 2, 0, 32, false, 100);
messageStore.pop(1, 2, 2, 0, 32, false, 100);
messageStore.pop(1, 1, 2, 0, 32, false, 100);

List<CheckPoint> allCheckPointList = new ArrayList<>();
kvService.iterate(MessageStoreImpl.KV_PARTITION_CHECK_POINT, (key, value) ->
allCheckPointList.add(CheckPoint.getRootAsCheckPoint(ByteBuffer.wrap(value))));

assertEquals(4, allCheckPointList.size());

assertEquals(1, allCheckPointList.get(0).consumerGroupId());
assertEquals(1, allCheckPointList.get(0).topicId());
assertEquals(1, allCheckPointList.get(0).queueId());

assertEquals(1, allCheckPointList.get(1).consumerGroupId());
assertEquals(1, allCheckPointList.get(1).topicId());
assertEquals(2, allCheckPointList.get(1).queueId());

assertEquals(1, allCheckPointList.get(2).consumerGroupId());
assertEquals(2, allCheckPointList.get(2).topicId());
assertEquals(2, allCheckPointList.get(2).queueId());

assertEquals(2, allCheckPointList.get(3).consumerGroupId());
assertEquals(2, allCheckPointList.get(3).topicId());
assertEquals(2, allCheckPointList.get(3).queueId());
}
}

0 comments on commit 7f8b855

Please sign in to comment.