Skip to content

Commit

Permalink
feat(store): introduce flatbuffer (#31)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored and daniel-y committed Sep 27, 2023
1 parent 7b91146 commit 2abd65a
Show file tree
Hide file tree
Showing 11 changed files with 326 additions and 10 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ jobs:
- uses: actions/checkout@v3
with:
submodules: true
- name: Install flatc
run: sudo bash install_flatc.sh
- uses: actions/setup-java@v3
with:
distribution: "temurin"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@

package com.automq.rocketmq.common.model;

public record Message() {
public record Message(long offset) {
}
28 changes: 28 additions & 0 deletions install_flatc.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/bash
set -e

if [ "$EUID" -ne 0 ]
then echo "To install dependencies, you need to run as root. Please try running with sudo: sudo $0"
fi

if [ ! -f /usr/local/bin/flatc ]; then
apt update
apt install -y unzip clang
arch=$(uname -m)
if [ "$arch" == "arm64" ]; then
echo "Host arch is arm64"
wget -O flatc.zip https://github.com/google/flatbuffers/releases/download/v23.5.26/Mac.flatc.binary.zip
elif [ "$arch" == "x86_64" ]; then
echo "Host arch is amd64"
wget -O flatc.zip https://github.com/google/flatbuffers/releases/download/v23.5.26/Linux.flatc.binary.clang++-12.zip
else
echo "Unsupported arch"
exit 1
fi
unzip flatc.zip
mv flatc /usr/local/bin/
rm flatc.zip
echo "install flatc successfully"
else
echo "flatc exists"
fi
61 changes: 60 additions & 1 deletion store/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,17 @@

<artifactId>rocketmq-store</artifactId>

<properties>
<fbs.sources>${basedir}/src/main/fbs</fbs.sources>
<fbs.generated.sources>${project.build.directory}/generated-sources/java</fbs.generated.sources>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>${groupId}</groupId>
<artifactId>rocketmq-common</artifactId>
<version>${version}</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
Expand All @@ -42,5 +48,58 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>com.google.flatbuffers</groupId>
<artifactId>flatbuffers-java</artifactId>
<version>23.5.26</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.6.0</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
<phase>generate-sources</phase>
</execution>
</executions>
<configuration>
<executable>flatc</executable>
<workingDirectory>${fbs.sources}</workingDirectory>
<arguments>
<argument>--java</argument>
<argument>-o</argument>
<argument>${fbs.generated.sources}</argument>
<argument>store.fbs</argument>
</arguments>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${fbs.generated.sources}</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
11 changes: 11 additions & 0 deletions store/src/main/fbs/store.fbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace com.automq.rocketmq.store.model.generated;

table CheckPoint {
serial_number:long;
delivery_timestamp:long;
invisible_duration:long;
topic_id:long;
queue_id:long;
offset:long;
consumer_group_id:long;
}
11 changes: 6 additions & 5 deletions store/src/main/java/com/automq/rocketmq/store/MessageStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
package com.automq.rocketmq.store;

import com.automq.rocketmq.store.model.message.AckResult;
import com.automq.rocketmq.store.model.message.ChangeInvisibleTimeResult;
import com.automq.rocketmq.store.model.message.ChangeInvisibleDurationResult;
import com.automq.rocketmq.store.model.message.PopResult;
import java.util.UUID;

public interface MessageStore {
PopResult pop(UUID topicId, int queueId, long offset, int maxCount, UUID groupId, boolean isOrder);
PopResult pop(long consumeGroupId, long topicId, int queueId, long offset, int maxCount, boolean isOrder, long invisibleDuration);

AckResult ack(String receiptHandle);

ChangeInvisibleTimeResult changeInvisibleTime(String receiptHandle, int nextVisibleTime);
ChangeInvisibleDurationResult changeInvisibleDuration(String receiptHandle, int invisibleTime);

int getInflightStatsByQueue(UUID topicId, int queueId);
int getInflightStatsByQueue(long topicId, int queueId);

boolean cleanMetadata(long topicId, int queueId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.store.impl;

import com.automq.rocketmq.common.model.Message;
import com.automq.rocketmq.store.MessageStore;
import com.automq.rocketmq.store.StreamStore;
import com.automq.rocketmq.store.model.generated.CheckPoint;
import com.automq.rocketmq.store.model.message.AckResult;
import com.automq.rocketmq.store.model.message.ChangeInvisibleDurationResult;
import com.automq.rocketmq.store.model.message.PopResult;
import com.automq.rocketmq.store.service.KVService;
import com.google.flatbuffers.FlatBufferBuilder;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.rocksdb.RocksDBException;

public class MessageStoreImpl implements MessageStore {
protected static final String KV_PARTITION_CHECK_POINT = "check_point";
protected static final String KV_PARTITION_TIMER_TAG = "timer_tag";

private final StreamStore streamStore;
private final KVService kvService;

private final AtomicLong fakeSerialNumberGenerator = new AtomicLong();

public MessageStoreImpl(StreamStore streamStore, KVService kvService) {
this.streamStore = streamStore;
this.kvService = kvService;
}

//<groupId><topicId><queueId><offset>
protected static byte[] buildCheckPointKey(long consumeGroupId, long topicId, int queueId, long offset) {
ByteBuffer buffer = ByteBuffer.allocate(28);
buffer.putLong(0, consumeGroupId);
buffer.putLong(8, topicId);
buffer.putInt(16, queueId);
buffer.putLong(20, offset);
return buffer.array();
}

private static byte[] buildCheckPointValue(long serialNumber, long deliveryTimestamp, long invisibleDuration,
long topicId,
int queueId, long offset, long consumeGroupId) {
FlatBufferBuilder builder = new FlatBufferBuilder();
int root = CheckPoint.createCheckPoint(builder, serialNumber, deliveryTimestamp, invisibleDuration, topicId, queueId, offset, consumeGroupId);
builder.finish(root);
return builder.sizedByteArray();
}

// <invisibleTime><groupId><topicId><queueId><offset>
protected static byte[] buildTimerTagKey(long invisibleTime, long consumeGroupId, long topicId, int queueId,
long offset) {
ByteBuffer buffer = ByteBuffer.allocate(36);
buffer.putLong(0, invisibleTime);
buffer.putLong(8, consumeGroupId);
buffer.putLong(16, topicId);
buffer.putInt(24, queueId);
buffer.putLong(28, offset);
return buffer.array();
}

@Override
public PopResult pop(long consumeGroupId, long topicId, int queueId, long offset, int maxCount, boolean isOrder,
long invisibleDuration) {
// TODO: write this request to operation log and get the serial number
// serial number should be monotonically increasing
long serialNumber = fakeSerialNumberGenerator.getAndIncrement();

long deliveryTimestamp = System.nanoTime();

// TODO: fetch message from stream store
List<Message> messageList = new ArrayList<>();

// add mock message
messageList.add(new Message(0));

// insert check point and timer tag into KVService
messageList.forEach(message -> {
try {
// put check point
kvService.put(KV_PARTITION_CHECK_POINT,
buildCheckPointKey(consumeGroupId, topicId, queueId, message.offset()),
buildCheckPointValue(serialNumber, deliveryTimestamp, invisibleDuration, topicId, queueId, message.offset(), consumeGroupId));
// put timer tag
kvService.put(KV_PARTITION_TIMER_TAG,
buildTimerTagKey(invisibleDuration, consumeGroupId, topicId, queueId, message.offset()), new byte[0]);
} catch (RocksDBException e) {
throw new RuntimeException(e);
}
});

if (!isOrder) {
// TODO: commit consumer offset
}

return new PopResult(0, deliveryTimestamp, messageList);
}

@Override
public AckResult ack(String receiptHandle) {
// write this request to operation log

// delete check point and timer tag according to receiptHandle

return null;
}

@Override
public ChangeInvisibleDurationResult changeInvisibleDuration(String receiptHandle, int invisibleDuration) {
// write this request to operation log

// change invisibleTime in check point info and regenerate timer tag

return null;
}

@Override
public int getInflightStatsByQueue(long topicId, int queueId) {
// get check point count of specified topic and queue
return 0;
}

@Override
public boolean cleanMetadata(long topicId, int queueId) {
// clean all check points and timer tags of specified topic and queue
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@

package com.automq.rocketmq.store.model.message;

public record ChangeInvisibleTimeResult() {
public record ChangeInvisibleDurationResult() {
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@
import com.automq.rocketmq.common.model.Message;
import java.util.List;

public record PopResult(int status, List<Message> messageList) {
public record PopResult(int status, long deliveryTimestamp, List<Message> messageList) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public interface KVService {
Charset CHARSET = StandardCharsets.UTF_8;

// TODO: Map RocksDBException into KVStoreException
// TODO: Map RocksDBException into StoreException

/**
* Get value with specified key from backend kv engine.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.store.impl;

import com.automq.rocketmq.common.model.Message;
import com.automq.rocketmq.store.MessageStore;
import com.automq.rocketmq.store.model.generated.CheckPoint;
import com.automq.rocketmq.store.model.message.PopResult;
import com.automq.rocketmq.store.service.KVService;
import com.automq.rocketmq.store.service.RocksDBKVService;
import java.nio.ByteBuffer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.rocksdb.RocksDBException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

class MessageStoreTest {
private static final String PATH = "/tmp/test_message_store/";

private static KVService kvService;
private static MessageStore messageStore;

@BeforeAll
private static void setUp() throws RocksDBException {
kvService = new RocksDBKVService(PATH);
messageStore = new MessageStoreImpl(null, kvService);
}

@Test
void pop() throws RocksDBException {
long testStartTime = System.nanoTime();
PopResult popResult = messageStore.pop(1, 1, 1, 1, 32, false, 100);
assertEquals(0, popResult.status());
assertFalse(popResult.messageList().isEmpty());

for (Message message : popResult.messageList()) {
byte[] bytes = kvService.get(MessageStoreImpl.KV_PARTITION_CHECK_POINT, MessageStoreImpl.buildCheckPointKey(1, 1, 1, message.offset()));
assertNotNull(bytes);

CheckPoint checkPoint = CheckPoint.getRootAsCheckPoint(ByteBuffer.wrap(bytes));
assertTrue(testStartTime < popResult.deliveryTimestamp());
assertEquals(popResult.deliveryTimestamp(), checkPoint.deliveryTimestamp());
assertEquals(100, checkPoint.invisibleDuration());
assertEquals(1, checkPoint.consumerGroupId());
assertEquals(1, checkPoint.topicId());
assertEquals(1, checkPoint.queueId());
assertEquals(message.offset(), checkPoint.offset());
}
}
}

0 comments on commit 2abd65a

Please sign in to comment.