Skip to content

Commit

Permalink
feat(store): implement pop, ack, and changeInvisibleDuration (#37)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored and daniel-y committed Sep 27, 2023
1 parent 9447512 commit c2fbc32
Show file tree
Hide file tree
Showing 12 changed files with 526 additions and 86 deletions.
19 changes: 14 additions & 5 deletions store/src/main/fbs/store.fbs
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
namespace com.automq.rocketmq.store.model.generated;

table CheckPoint {
serial_number:long;
delivery_timestamp:long;
invisible_duration:long;
topic_id:long;
queue_id:long;
offset:long;
queue_id:int;
messge_offset:long;
consumer_group_id:long;
operation_id:long;
is_order:bool;
delivery_timestamp:long;
next_visible_timestamp:long;
reconsume_count:int;
}

table ReceiptHandle {
topic_id:long;
queue_id:int;
message_offset:long;
operation_id:long;
}
29 changes: 27 additions & 2 deletions store/src/main/java/com/automq/rocketmq/store/MessageStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,36 @@
import com.automq.rocketmq.store.model.message.PopResult;

public interface MessageStore {
PopResult pop(long consumeGroupId, long topicId, int queueId, long offset, int maxCount, boolean isOrder, long invisibleDuration);
/**
* Pop message from specified topic and queue.
*
* @param consumeGroupId consumer group id that launches this query
* @param topicId topic id to pop message from
* @param queueId queue id to pop message from
* @param offset offset to start from
* @param batchSize maximum count of messages
* @param isOrder is orderly pop
* @param invisibleDuration the duration for the next time this batch of messages will be visible, in nanoseconds
* @return pop result, see {@link PopResult}
*/
PopResult pop(long consumeGroupId, long topicId, int queueId, long offset, int batchSize, boolean isOrder, long invisibleDuration);

/**
* Ack message.
*
* @param receiptHandle unique receipt handle to identify inflight message
* @return ack result, see {@link AckResult}
*/
AckResult ack(String receiptHandle);

ChangeInvisibleDurationResult changeInvisibleDuration(String receiptHandle, int invisibleTime);
/**
* Change invisible duration.
*
* @param receiptHandle unique receipt handle to identify inflight message
* @param invisibleDuration the duration for the next time this batch of messages will be visible, in nanoseconds
* @return change invisible duration result, see {@link ChangeInvisibleDurationResult}
*/
ChangeInvisibleDurationResult changeInvisibleDuration(String receiptHandle, long invisibleDuration);

int getInflightStatsByQueue(long topicId, int queueId);

Expand Down
245 changes: 206 additions & 39 deletions store/src/main/java/com/automq/rocketmq/store/impl/MessageStoreImpl.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.store.model.kv;

public record BatchDeleteRequest(String partition, byte[] key) implements BatchRequest {
@Override
public byte[] value() {
return null;
}

@Override
public BatchRequestType type() {
return BatchRequestType.DELETE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.store.model.kv;

public interface BatchRequest {
String partition();

byte[] key();

byte[] value();

BatchRequestType type();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.store.model.kv;

public enum BatchRequestType {
WRITE,
DELETE,
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,9 @@

package com.automq.rocketmq.store.model.kv;

public record BatchWriteRequest(String partition, byte[] key, byte[] value) {
public record BatchWriteRequest(String partition, byte[] key, byte[] value) implements BatchRequest {
@Override
public BatchRequestType type() {
return BatchRequestType.WRITE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@
import com.automq.rocketmq.common.model.Message;
import java.util.List;

public record PopResult(int status, long deliveryTimestamp, List<Message> messageList) {
public record PopResult(int status, long operationId, long deliveryTimestamp, List<Message> messageList) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.automq.rocketmq.store.service;

import com.automq.rocketmq.store.model.kv.BatchWriteRequest;
import com.automq.rocketmq.store.model.kv.BatchRequest;
import com.automq.rocketmq.store.model.kv.IteratorCallback;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -75,13 +75,12 @@ void iterate(final String partition, String prefix, final String start,
void put(final String partition, byte[] key, byte[] value) throws RocksDBException;

/**
* Put the kv pair into the backend engine.
* The kv pair will be written into the backend engine in batch.
* Put or delete the kv pair in batch.
*
* @param writeRequests the batch builder
* @param requests the mutation requests
* @throws RocksDBException if backend engine fails
*/
void writeBatch(BatchWriteRequest... writeRequests) throws RocksDBException;
void batch(BatchRequest... requests) throws RocksDBException;

/**
* Delete value with specified key from backend kv engine.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.automq.rocketmq.store.service;

import com.automq.rocketmq.store.model.kv.BatchWriteRequest;
import com.automq.rocketmq.store.model.kv.BatchRequest;
import com.automq.rocketmq.store.model.kv.IteratorCallback;
import com.google.common.base.Strings;
import java.io.File;
Expand Down Expand Up @@ -236,14 +236,23 @@ public void put(final String partition, byte[] key, byte[] value) throws RocksDB
}

@Override
public void writeBatch(BatchWriteRequest... writeRequests) throws RocksDBException {
public void batch(BatchRequest... requests) throws RocksDBException {
if (stopped) {
throw new RocksDBException("KV service is stopped.");
}

if (requests == null) {
throw new RocksDBException("The requests can not be null.");
}

try (WriteOptions writeOptions = new WriteOptions(); WriteBatch writeBatch = new WriteBatch()) {
for (BatchWriteRequest request : writeRequests) {
for (BatchRequest request : requests) {
ColumnFamilyHandle handle = getOrCreateColumnFamily(request.partition());
writeBatch.put(handle, request.key(), request.value());
switch (request.type()) {
case WRITE -> writeBatch.put(handle, request.key(), request.value());
case DELETE -> writeBatch.delete(handle, request.key());
default -> throw new RocksDBException("Unsupported request type: " + request.type());
}
}
rocksDB.write(writeOptions, writeBatch);
}
Expand Down
38 changes: 38 additions & 0 deletions store/src/test/java/com/automq/rocketmq/store/KVServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package com.automq.rocketmq.store;

import com.automq.rocketmq.store.model.kv.BatchDeleteRequest;
import com.automq.rocketmq.store.model.kv.BatchWriteRequest;
import com.automq.rocketmq.store.service.KVService;
import com.automq.rocketmq.store.service.RocksDBKVService;
import java.io.File;
Expand Down Expand Up @@ -164,5 +166,41 @@ public void testIterate() throws IOException, RocksDBException {

store.destroy();
}

@Test
public void testBatch() throws IOException, RocksDBException {
String path = new File(PATH + UUID.randomUUID()).getCanonicalPath();
cleanUp(path);
KVService store = new RocksDBKVService(path);
assertNotNull(store);

assertThrowsExactly(RocksDBException.class, () -> store.batch(null));
store.batch(new BatchWriteRequest(PARTITION, "0".getBytes(), "0".getBytes()), new BatchWriteRequest(PARTITION, "1".getBytes(), "1".getBytes()));

AtomicInteger num = new AtomicInteger();
store.iterate(PARTITION, (key, value) -> {
String valueStr = new String(value, KVService.CHARSET);
String target = String.valueOf(num.getAndIncrement());

assertEquals(target, valueStr);
});
assertEquals(2, num.get());

store.batch(new BatchDeleteRequest(PARTITION, "0".getBytes()), new BatchWriteRequest(PARTITION, "2".getBytes(), "2".getBytes()));

num.set(1);
store.iterate(PARTITION, (key, value) -> {
String valueStr = new String(value, KVService.CHARSET);
String target = String.valueOf(num.getAndIncrement());

assertEquals(target, valueStr);
});
assertEquals(3, num.get());

store.batch(new BatchDeleteRequest(PARTITION, "1".getBytes()), new BatchDeleteRequest(PARTITION, "2".getBytes()));
num.set(0);
store.iterate(PARTITION, (key, value) -> num.getAndIncrement());
assertEquals(0, num.get());
}
}

Loading

0 comments on commit c2fbc32

Please sign in to comment.