Skip to content

Commit

Permalink
feat: trim stream prior to deleting objects (#578)
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
Signed-off-by: SSpirits <[email protected]>
Co-authored-by: SSpirits <[email protected]>
  • Loading branch information
lizhanhui and ShadowySpirits authored Nov 6, 2023
1 parent bf001d5 commit 5626710
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import com.automq.rocketmq.common.api.DataStore;
import com.automq.rocketmq.common.config.BrokerConfig;
import com.automq.rocketmq.common.util.Lifecycle;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.controller.server.ControllerServiceImpl;
import com.automq.rocketmq.controller.server.MetadataStoreBuilder;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.metadata.DefaultProxyMetadataService;
import com.automq.rocketmq.metadata.DefaultStoreMetadataService;
import com.automq.rocketmq.metadata.api.ProxyMetadataService;
Expand Down Expand Up @@ -75,7 +75,7 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {
MessageStoreImpl messageStore = MessageStoreBuilder.build(brokerConfig.store(), brokerConfig.s3Stream(), storeMetadataService, dlqService);
this.messageStore = messageStore;

DataStore dataStore = new DataStoreFacade(messageStore.getS3ObjectOperator(), messageStore.getTopicQueueManager());
DataStore dataStore = new DataStoreFacade(messageStore.streamStore(), messageStore.s3ObjectOperator(), messageStore.topicQueueManager());
metadataStore.setDataStore(dataStore);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ public interface DataStore {

CompletableFuture<Void> closeQueue(long topicId, int queueId);

/**
* Trim stream such that records with offsets prior to <code>offset</code> will be inaccessible.
*
* @param streamId ID of the stream to trim
* @param offset Minimum offset of the stream after trim operation
*/
CompletableFuture<Void> trimStream(long streamId, long offset);

/**
* Delete a list of S3 objects by object id.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import com.automq.rocketmq.common.model.FlatMessageExt;
import com.automq.rocketmq.common.model.generated.FlatMessage;
import com.automq.rocketmq.store.api.LogicQueue;
import com.automq.rocketmq.store.api.LogicQueueManager;
import com.automq.rocketmq.store.api.MessageStore;
import com.automq.rocketmq.store.api.S3ObjectOperator;
import com.automq.rocketmq.store.model.message.AckResult;
import com.automq.rocketmq.store.model.message.ChangeInvisibleDurationResult;
import com.automq.rocketmq.store.model.message.ClearRetryMessagesResult;
Expand All @@ -40,7 +38,6 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.Pair;

public class MockMessageStore implements MessageStore {
Expand All @@ -65,16 +62,6 @@ public void shutdown() {

}

@Override
public LogicQueueManager getTopicQueueManager() {
throw new NotImplementedException();
}

@Override
public S3ObjectOperator getS3ObjectOperator() {
throw new NotImplementedException();
}

@Override
public CompletableFuture<PopResult> pop(long consumerGroupId, long topicId, int queueId, Filter filter,
int batchSize, boolean fifo, boolean retry, long invisibleDuration) {
Expand Down
12 changes: 10 additions & 2 deletions store/src/main/java/com/automq/rocketmq/store/DataStoreFacade.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@
import com.automq.rocketmq.common.api.DataStore;
import com.automq.rocketmq.store.api.LogicQueueManager;
import com.automq.rocketmq.store.api.S3ObjectOperator;
import com.automq.rocketmq.store.api.StreamStore;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class DataStoreFacade implements DataStore {

private final StreamStore streamStore;
private final S3ObjectOperator s3ObjectOperator;

private final LogicQueueManager logicQueueManager;

public DataStoreFacade(S3ObjectOperator s3ObjectOperator, LogicQueueManager logicQueueManager) {
public DataStoreFacade(StreamStore streamStore, S3ObjectOperator s3ObjectOperator,
LogicQueueManager logicQueueManager) {
this.streamStore = streamStore;
this.s3ObjectOperator = s3ObjectOperator;
this.logicQueueManager = logicQueueManager;
}
Expand All @@ -39,6 +42,11 @@ public CompletableFuture<Void> closeQueue(long topicId, int queueId) {
return logicQueueManager.close(topicId, queueId);
}

@Override
public CompletableFuture<Void> trimStream(long streamId, long offset) {
return streamStore.trim(streamId, offset);
}

@Override
public CompletableFuture<List<Long>> batchDeleteS3Objects(List<Long> objectIds) {
return s3ObjectOperator.delete(objectIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ public MessageStoreImpl(StoreConfig config, StreamStore streamStore,
this.s3ObjectOperator = s3ObjectOperator;
}

@Override
public LogicQueueManager getTopicQueueManager() {
public LogicQueueManager topicQueueManager() {
return logicQueueManager;
}

Expand All @@ -93,8 +92,7 @@ public StreamStore streamStore() {
/**
* @return {@link S3ObjectOperator} instance
*/
@Override
public S3ObjectOperator getS3ObjectOperator() {
public S3ObjectOperator s3ObjectOperator() {
return s3ObjectOperator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@

public interface MessageStore extends Lifecycle {

LogicQueueManager getTopicQueueManager();

S3ObjectOperator getS3ObjectOperator();

/**
* Pop message from specified topic and queue.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void run() {
waitForRunning(config.periodicExporterIntervalInMills());

StreamStore streamStore = messageStore.streamStore();
DefaultLogicQueueManager manager = (DefaultLogicQueueManager) messageStore.getTopicQueueManager();
DefaultLogicQueueManager manager = (DefaultLogicQueueManager) messageStore.topicQueueManager();
Set<LagRecord> newLagRecordSet = Sets.newConcurrentHashSet();
manager.logicQueueMap().forEach((topicQueueId, logicQueueFuture) -> {
if (!logicQueueFuture.isDone() || logicQueueFuture.isCompletedExceptionally()) {
Expand Down Expand Up @@ -148,7 +148,6 @@ public void initStaticMetrics(Meter meter) {

@Override
public void initDynamicMetrics(Meter meter) {
StoreMetricsManager.attributesBuilderSupplier = attributesBuilderSupplier;
consumerLagMessages = meter.gaugeBuilder(GAUGE_CONSUMER_LAG_MESSAGES)
.setDescription("Consumer lag messages")
.ofLongs()
Expand Down

0 comments on commit 5626710

Please sign in to comment.