Skip to content

Commit

Permalink
feat(store): support trimming retry stream
Browse files Browse the repository at this point in the history
1. support trimming retry stream

Signed-off-by: TheR1sing3un <[email protected]>
  • Loading branch information
TheR1sing3un committed Nov 1, 2023
1 parent 53f539c commit 862549c
Show file tree
Hide file tree
Showing 13 changed files with 454 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.automq.rocketmq.store.service.RocksDBKVService;
import com.automq.rocketmq.store.service.SnapshotService;
import com.automq.rocketmq.store.service.StreamOperationLogService;
import com.automq.rocketmq.store.service.StreamReclaimService;
import com.automq.rocketmq.store.service.TimerService;
import com.automq.rocketmq.store.service.api.KVService;
import com.automq.rocketmq.store.service.api.OperationLogService;
Expand All @@ -49,10 +50,11 @@ public static MessageStoreImpl build(StoreConfig storeConfig, S3StreamConfig s3S
InflightService inflightService = new InflightService();
SnapshotService snapshotService = new SnapshotService(streamStore, kvService);
OperationLogService operationLogService = new StreamOperationLogService(streamStore, snapshotService, storeConfig);
StreamReclaimService streamReclaimService = new StreamReclaimService(streamStore);
// TODO: We may have multiple timer service in the future.
TimerService timerService = new TimerService("timer_tag_0", kvService);
LogicQueueManager logicQueueManager = new DefaultLogicQueueManager(storeConfig, streamStore, kvService, timerService,
metadataService, operationLogService, inflightService);
metadataService, operationLogService, inflightService, streamReclaimService);
ReviveService reviveService = new ReviveService(KV_NAMESPACE_CHECK_POINT, kvService, timerService,
metadataService, inflightService, logicQueueManager, deadLetterSender);
S3ObjectOperator objectOperator = new S3ObjectOperatorImpl(operator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,13 @@ public interface MessageStateMachine {

int consumeTimes(long consumerGroupId, long offset);

void registerAckOffsetListener(OffsetListener listener);

void registerRetryAckOffsetListener(OffsetListener listener);

class ReplayPopResult {
private final int popTimes;

private ReplayPopResult(int popTimes) {
this.popTimes = popTimes;
}
Expand All @@ -74,4 +79,8 @@ public int getPopTimes() {
}
}

interface OffsetListener {
void onOffset(long consumerGroupId, long offset);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.automq.rocketmq.store.exception.StoreException;
import com.automq.rocketmq.store.model.message.TopicQueueId;
import com.automq.rocketmq.store.service.InflightService;
import com.automq.rocketmq.store.service.StreamReclaimService;
import com.automq.rocketmq.store.service.TimerService;
import com.automq.rocketmq.store.service.api.KVService;
import com.automq.rocketmq.store.service.api.OperationLogService;
Expand All @@ -51,20 +52,22 @@ public class DefaultLogicQueueManager implements LogicQueueManager {
private final StoreMetadataService metadataService;
private final OperationLogService operationLogService;
private final InflightService inflightService;
private final StreamReclaimService streamReclaimService;
private final ConcurrentMap<TopicQueueId, CompletableFuture<LogicQueue>> logicQueueMap;
private final String identity = "[DefaultLogicQueueManager]";

public DefaultLogicQueueManager(StoreConfig storeConfig, StreamStore streamStore,
KVService kvService, TimerService timerService, StoreMetadataService metadataService,
OperationLogService operationLogService,
InflightService inflightService) {
InflightService inflightService, StreamReclaimService streamReclaimService) {
this.storeConfig = storeConfig;
this.streamStore = streamStore;
this.kvService = kvService;
this.timerService = timerService;
this.metadataService = metadataService;
this.operationLogService = operationLogService;
this.inflightService = inflightService;
this.streamReclaimService = streamReclaimService;
this.logicQueueMap = new ConcurrentHashMap<>();
}

Expand Down Expand Up @@ -154,7 +157,7 @@ public CompletableFuture<LogicQueue> createAndOpen(long topicId, int queueId) {

MessageStateMachine stateMachine = new DefaultLogicQueueStateMachine(topicId, queueId, kvService, timerService);
LogicQueue logicQueue = new StreamLogicQueue(storeConfig, topicId, queueId,
metadataService, stateMachine, streamStore, operationLogService, inflightService);
metadataService, stateMachine, streamStore, operationLogService, inflightService, streamReclaimService);

LOGGER.info("{}: Create and open logic queue success: topic: {} queue: {}", identity, topicId, queueId);
return logicQueue.open()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class DefaultLogicQueueStateMachine implements MessageStateMachine {
private final KVService kvService;
private final TimerService timerService;
private final String identity;
private final List<OffsetListener> ackOffsetListeners = new ArrayList<>();
private final List<OffsetListener> retryAckOffsetListeners = new ArrayList<>();

public DefaultLogicQueueStateMachine(long topicId, int queueId, KVService kvService, TimerService timerService) {
this.consumerGroupMetadataMap = new ConcurrentHashMap<>();
Expand All @@ -98,6 +100,16 @@ public int queueId() {
return queueId;
}

@Override
public void registerAckOffsetListener(OffsetListener listener) {
this.ackOffsetListeners.add(listener);
}

@Override
public void registerRetryAckOffsetListener(OffsetListener listener) {
this.retryAckOffsetListeners.add(listener);
}

@Override
public ReplayPopResult replayPopOperation(long operationOffset, PopOperation operation) throws StoreException {
reentrantLock.lock();
Expand Down Expand Up @@ -278,7 +290,10 @@ private AckCommitter getAckCommitter(long consumerGroupId) {

private AckCommitter getAckCommitter(long consumerGroupId, RoaringBitmap bitmap) {
ConsumerGroupMetadata metadata = this.consumerGroupMetadataMap.computeIfAbsent(consumerGroupId, k -> new ConsumerGroupMetadata(consumerGroupId));
return this.ackCommitterMap.computeIfAbsent(consumerGroupId, k -> new AckCommitter(metadata.getAckOffset(), metadata::setAckOffset, bitmap));
return this.ackCommitterMap.computeIfAbsent(consumerGroupId, k -> new AckCommitter(metadata.getAckOffset(), offset -> {
metadata.setAckOffset(offset);
this.ackOffsetListeners.forEach(listener -> listener.onOffset(consumerGroupId, offset));
}, bitmap));
}

private AckCommitter getRetryAckCommitter(long consumerGroupId) {
Expand All @@ -287,7 +302,10 @@ private AckCommitter getRetryAckCommitter(long consumerGroupId) {

private AckCommitter getRetryAckCommitter(long consumerGroupId, RoaringBitmap bitmap) {
ConsumerGroupMetadata metadata = this.consumerGroupMetadataMap.computeIfAbsent(consumerGroupId, k -> new ConsumerGroupMetadata(consumerGroupId));
return this.retryAckCommitterMap.computeIfAbsent(consumerGroupId, k -> new AckCommitter(metadata.getRetryAckOffset(), metadata::setRetryAckOffset, bitmap));
return this.retryAckCommitterMap.computeIfAbsent(consumerGroupId, k -> new AckCommitter(metadata.getRetryAckOffset(), offset -> {
metadata.setRetryAckOffset(offset);
this.retryAckOffsetListeners.forEach(listener -> listener.onOffset(consumerGroupId, offset));
}, bitmap));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
import com.automq.rocketmq.store.model.operation.ResetConsumeOffsetOperation;
import com.automq.rocketmq.store.model.stream.SingleRecord;
import com.automq.rocketmq.store.service.InflightService;
import com.automq.rocketmq.store.service.StreamReclaimService;
import com.automq.rocketmq.store.service.api.OperationLogService;
import com.automq.rocketmq.store.util.SerializeUtil;
import com.automq.stream.utils.FutureUtil;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -68,11 +70,13 @@ public class StreamLogicQueue extends LogicQueue {
private final StoreConfig config;
private final OperationLogService operationLogService;
private final InflightService inflightService;
private final StreamReclaimService streamReclaimService;
private final AtomicReference<State> state;

public StreamLogicQueue(StoreConfig config, long topicId, int queueId,
StoreMetadataService metadataService, MessageStateMachine stateMachine, StreamStore streamStore,
OperationLogService operationLogService, InflightService inflightService) {
OperationLogService operationLogService, InflightService inflightService,
StreamReclaimService streamReclaimService) {
super(topicId, queueId);
this.config = config;
this.metadataService = metadataService;
Expand All @@ -81,6 +85,7 @@ public StreamLogicQueue(StoreConfig config, long topicId, int queueId,
this.retryStreamIdMap = new ConcurrentHashMap<>();
this.operationLogService = operationLogService;
this.inflightService = inflightService;
this.streamReclaimService = streamReclaimService;
this.state = new AtomicReference<>(State.INIT);
}

Expand Down Expand Up @@ -124,11 +129,38 @@ public CompletableFuture<Void> open() {
})
// recover from operation log
.thenCompose(nil -> operationLogService.recover(stateMachine, operationStreamId, snapshotStreamId))
.thenAccept(nil -> {
// register retry ack advance listener
this.stateMachine.registerRetryAckOffsetListener(this::onRetryAckOffsetAdvance);
state.set(State.OPENED);
})
.thenAccept(nil -> state.set(State.OPENED));
}
return CompletableFuture.completedFuture(null);
}

private void onRetryAckOffsetAdvance(long consumerGroupId, long ackOffset) {
// TODO: add reclaim policy
CompletableFuture<Long> retryStreamIdCf = retryStreamIdMap.get(consumerGroupId);
if (retryStreamIdCf == null) {
LOGGER.warn("Retry stream id not found for consumer group: {}", consumerGroupId);
return;
}
CompletableFuture<StreamReclaimService.StreamReclaimResult> taskCf =
streamReclaimService.addReclaimTask(new StreamReclaimService.StreamReclaimTask(retryStreamIdCf, ackOffset));
taskCf.thenAccept(result -> {
if (result.success()) {
LOGGER.trace("Reclaim consumerGroup: {} 's retry stream to new start offset: {}", consumerGroupId, result.startOffset());
} else {
LOGGER.warn("Aborted to reclaim consumerGroup: {} 's retry stream to new start offset: {}", consumerGroupId, ackOffset);
}
}).exceptionally(e -> {
Throwable cause = FutureUtil.cause(e);
LOGGER.error("Failed to reclaim consumerGroup: {} 's retry stream to new start offset: {}", consumerGroupId, ackOffset, cause);
return null;
});
}

@Override
public CompletableFuture<Void> close() {
if (state.compareAndSet(State.OPENED, State.CLOSING)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.automq.rocketmq.store.service.api.KVService;
import com.automq.rocketmq.store.util.SerializeUtil;
import com.automq.stream.utils.FutureUtil;
import com.automq.stream.utils.ThreadUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -37,6 +38,8 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -46,19 +49,22 @@

public class SnapshotService implements Lifecycle, Runnable {
public static final Logger LOGGER = LoggerFactory.getLogger(SnapshotService.class);

private final StreamStore streamStore;
private final BlockingQueue<SnapshotTask> snapshotTaskQueue;
private Thread snapshotTaker;
private final KVService kvService;
private volatile boolean stopped = false;
private final ConcurrentMap<TopicQueueId, SnapshotStatus> snapshotStatusMap = new ConcurrentHashMap<>();
private CompletableFuture<Void> runningCf;
private ExecutorService backgroundExecutor;

public SnapshotService(StreamStore streamStore, KVService kvService) {
this.streamStore = streamStore;
this.snapshotTaskQueue = new LinkedBlockingQueue<>(1024);
this.kvService = kvService;
this.backgroundExecutor = Executors.newSingleThreadExecutor(
ThreadUtils.createThreadFactory("snapshot-background-executor", false)
);
}

public static class SnapshotStatus {
Expand Down Expand Up @@ -87,6 +93,11 @@ public SnapshotStatus getSnapshotStatus(long topicId, int queueId) {
public void start() throws Exception {
this.stopped = false;
this.runningCf = new CompletableFuture<>();
if (this.backgroundExecutor == null || this.backgroundExecutor.isShutdown()) {
this.backgroundExecutor = Executors.newSingleThreadExecutor(
ThreadUtils.createThreadFactory("snapshot-background-executor", false)
);
}
this.snapshotTaker = new Thread(this, "snapshot-taker");
this.snapshotTaker.setDaemon(true);
this.snapshotTaker.start();
Expand All @@ -103,6 +114,11 @@ public void shutdown() throws Exception {
List<SnapshotTask> snapshotTasks = new ArrayList<>();
snapshotTaskQueue.drainTo(snapshotTasks);
snapshotTasks.forEach(SnapshotTask::abort);
// 3. shutdown background executor
if (this.backgroundExecutor != null) {
this.backgroundExecutor.shutdown();
this.backgroundExecutor = null;
}
}

@Override
Expand Down Expand Up @@ -173,14 +189,13 @@ CompletableFuture<Void> takeSnapshot(SnapshotTask task) {

// append snapshot to snapshot stream
return streamStore.append(snapshotStreamId, new SingleRecord(ByteBuffer.wrap(snapshotData)))
.thenCompose(appendResult -> {
.thenComposeAsync(appendResult -> {
// trim operation stream
return streamStore.trim(operationStreamId, snapshot.getSnapshotEndOffset() + 1)
.thenAccept(nil -> {
// complete snapshot task
task.completeSuccess(snapshot.getSnapshotEndOffset() + 1);
});
});
return streamStore.trim(operationStreamId, snapshot.getSnapshotEndOffset() + 1);
}, backgroundExecutor).thenAcceptAsync(nil -> {
// complete snapshot task
task.completeSuccess(snapshot.getSnapshotEndOffset() + 1);
}, backgroundExecutor);
}

CompletableFuture<TakeSnapshotResult> addSnapshotTask(SnapshotTask task) {
Expand Down
Loading

0 comments on commit 862549c

Please sign in to comment.