Skip to content

Commit

Permalink
feat: retry invocations with metadata that are caused by internal err…
Browse files Browse the repository at this point in the history
…or or mocked (#600)

Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored Nov 8, 2023
1 parent 1f3f1e7 commit bbc1808
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,52 +46,102 @@ public DefaultStoreMetadataService(MetadataStore metadataStore, S3MetadataServic

@Override
public CompletableFuture<StreamMetadata> dataStreamOf(long topicId, int queueId) {
return metadataStore.getStream(topicId, queueId, null, StreamRole.STREAM_ROLE_DATA);
AtomicBoolean loop = new AtomicBoolean(true);
return Futures.loop(loop::get, () -> metadataStore.getStream(topicId, queueId, null, StreamRole.STREAM_ROLE_DATA)
.thenApply(res -> {
loop.set(false);
return res;
}), MoreExecutors.directExecutor());
}

@Override
public CompletableFuture<StreamMetadata> operationStreamOf(long topicId, int queueId) {
return metadataStore.getStream(topicId, queueId, null, StreamRole.STREAM_ROLE_OPS);
AtomicBoolean loop = new AtomicBoolean(true);
return Futures.loop(loop::get, () -> metadataStore.getStream(topicId, queueId, null, StreamRole.STREAM_ROLE_OPS)
.thenApply(res -> {
loop.set(false);
return res;
}), MoreExecutors.directExecutor());
}

@Override
public CompletableFuture<StreamMetadata> snapshotStreamOf(long topicId, int queueId) {
return metadataStore.getStream(topicId, queueId, null, StreamRole.STREAM_ROLE_SNAPSHOT);
AtomicBoolean loop = new AtomicBoolean(true);
return Futures.loop(loop::get, () -> metadataStore.getStream(topicId, queueId, null, StreamRole.STREAM_ROLE_SNAPSHOT)
.thenApply(res -> {
loop.set(false);
return res;
}), MoreExecutors.directExecutor());
}

@Override
public CompletableFuture<StreamMetadata> retryStreamOf(long consumerGroupId, long topicId, int queueId) {
return metadataStore.getStream(topicId, queueId, consumerGroupId, StreamRole.STREAM_ROLE_RETRY);
AtomicBoolean loop = new AtomicBoolean(true);
return Futures.loop(loop::get, () -> metadataStore.getStream(topicId, queueId, consumerGroupId, StreamRole.STREAM_ROLE_RETRY)
.thenApply(res -> {
loop.set(false);
return res;
}), MoreExecutors.directExecutor());
}

@Override
public CompletableFuture<Integer> maxDeliveryAttemptsOf(long consumerGroupId) {
return metadataStore.describeGroup(consumerGroupId, null).thenApply((ConsumerGroup::getMaxDeliveryAttempt));
AtomicBoolean loop = new AtomicBoolean(true);
return Futures.loop(loop::get, () -> metadataStore.describeGroup(consumerGroupId, null).thenApply((ConsumerGroup::getMaxDeliveryAttempt))
.thenApply(res -> {
loop.set(false);
return res;
}), MoreExecutors.directExecutor());
}

@Override
public CompletableFuture<Void> trimStream(long streamId, long streamEpoch, long newStartOffset) {
return s3MetadataService.trimStream(streamId, streamEpoch, newStartOffset);
AtomicBoolean loop = new AtomicBoolean(true);
return Futures.loop(loop::get, () -> s3MetadataService.trimStream(streamId, streamEpoch, newStartOffset)
.thenApply(res -> {
loop.set(false);
return res;
}), MoreExecutors.directExecutor());
}

@Override
public CompletableFuture<StreamMetadata> openStream(long streamId, long streamEpoch) {
return metadataStore.openStream(streamId, streamEpoch, metadataStore.config().nodeId());
AtomicBoolean loop = new AtomicBoolean(true);
return Futures.loop(loop::get, () -> metadataStore.openStream(streamId, streamEpoch, metadataStore.config().nodeId())
.thenApply(res -> {
loop.set(false);
return res;
}), MoreExecutors.directExecutor());
}

@Override
public CompletableFuture<Void> closeStream(long streamId, long streamEpoch) {
return metadataStore.closeStream(streamId, streamEpoch, metadataStore.config().nodeId());
AtomicBoolean loop = new AtomicBoolean(true);
return Futures.loop(loop::get, () -> metadataStore.closeStream(streamId, streamEpoch, metadataStore.config().nodeId())
.thenApply(res -> {
loop.set(false);
return res;
}), MoreExecutors.directExecutor());
}

@Override
public CompletableFuture<List<StreamMetadata>> listOpenStreams() {
return metadataStore.listOpenStreams(metadataStore.config().nodeId());
AtomicBoolean loop = new AtomicBoolean(true);
return Futures.loop(loop::get, () -> metadataStore.listOpenStreams(metadataStore.config().nodeId())
.thenApply(res -> {
loop.set(false);
return res;
}), MoreExecutors.directExecutor());
}

@Override
public CompletableFuture<Long> prepareS3Objects(int count, int ttlInMinutes) {
return s3MetadataService.prepareS3Objects(count, ttlInMinutes);
AtomicBoolean loop = new AtomicBoolean(true);
return Futures.loop(loop::get, () -> s3MetadataService.prepareS3Objects(count, ttlInMinutes)
.thenApply(res -> {
loop.set(false);
return res;
}), MoreExecutors.directExecutor());
}

@Override
Expand Down Expand Up @@ -124,25 +174,45 @@ public CompletableFuture<Void> commitStreamObject(S3StreamObject streamObject, L

@Override
public CompletableFuture<List<S3WALObject>> listWALObjects() {
return s3MetadataService.listWALObjects();
AtomicBoolean loop = new AtomicBoolean(true);
return Futures.loop(loop::get, () -> s3MetadataService.listWALObjects()
.thenApply(res -> {
loop.set(false);
return res;
}), MoreExecutors.directExecutor());
}

@Override
public CompletableFuture<List<S3WALObject>> listWALObjects(long streamId, long startOffset, long endOffset,
int limit) {
return s3MetadataService.listWALObjects(streamId, startOffset, endOffset, limit);
AtomicBoolean loop = new AtomicBoolean(true);
return Futures.loop(loop::get, () -> s3MetadataService.listWALObjects(streamId, startOffset, endOffset, limit)
.thenApply(res -> {
loop.set(false);
return res;
}), MoreExecutors.directExecutor());
}

@Override
public CompletableFuture<List<S3StreamObject>> listStreamObjects(long streamId, long startOffset, long endOffset,
int limit) {
return s3MetadataService.listStreamObjects(streamId, startOffset, endOffset, limit);
AtomicBoolean loop = new AtomicBoolean(true);
return Futures.loop(loop::get, () -> s3MetadataService.listStreamObjects(streamId, startOffset, endOffset, limit)
.thenApply(res -> {
loop.set(false);
return res;
}), MoreExecutors.directExecutor());
}

@Override
public CompletableFuture<Pair<List<S3StreamObject>, List<S3WALObject>>> listObjects(long streamId, long startOffset,
long endOffset, int limit) {
return s3MetadataService.listObjects(streamId, startOffset, endOffset, limit);
AtomicBoolean loop = new AtomicBoolean(true);
return Futures.loop(loop::get, () -> s3MetadataService.listObjects(streamId, startOffset, endOffset, limit)
.thenApply(res -> {
loop.set(false);
return res;
}), MoreExecutors.directExecutor());
}

@Override
Expand All @@ -160,6 +230,12 @@ public CompletableFuture<List<StreamMetadata>> getStreams(List<Long> streamIds)
if (streamIds == null || streamIds.isEmpty()) {
return CompletableFuture.completedFuture(List.of());
}
return metadataStore.getStreams(streamIds);

AtomicBoolean loop = new AtomicBoolean(true);
return Futures.loop(loop::get, () -> metadataStore.getStreams(streamIds)
.thenApply(res -> {
loop.set(false);
return res;
}), MoreExecutors.directExecutor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ public class Futures {
* @return A CompletableFuture that, when completed, indicates the loop terminated without any exception. If
* either the loopBody or condition throw/return Exceptions, these will be set as the result of this returned Future.
*/
public static CompletableFuture<Void> loop(Supplier<Boolean> condition, Supplier<CompletableFuture<Void>> loopBody,
public static <T> CompletableFuture<T> loop(Supplier<Boolean> condition, Supplier<CompletableFuture<T>> loopBody,
Executor executor) {
CompletableFuture<Void> result = new CompletableFuture<>();
Loop<Void> loop = new Loop<>(condition, loopBody, result, executor);
CompletableFuture<T> result = new CompletableFuture<>();
Loop<T> loop = new Loop<>(condition, loopBody, result, executor);
executor.execute(loop);
return result;
}
Expand Down
32 changes: 27 additions & 5 deletions metadata/src/main/java/com/automq/rocketmq/metadata/Loop.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package com.automq.rocketmq.metadata;

import apache.rocketmq.controller.v1.Code;
import com.automq.rocketmq.common.exception.ControllerException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import org.slf4j.Logger;
Expand All @@ -27,17 +30,19 @@ public class Loop<T> implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(Loop.class);

final Supplier<Boolean> condition;

final Supplier<CompletableFuture<T>> loopBody;

/**
* A CompletableFuture that will be completed, whether normally or exceptionally, when the loop completes.
* A CompletableFuture that will be completed when the loop completes.
*/
final CompletableFuture<Void> result;

final CompletableFuture<T> result;

final Executor executor;

public Loop(Supplier<Boolean> condition, Supplier<CompletableFuture<T>> loopBody, CompletableFuture<Void> result,
private boolean terminated;

public Loop(Supplier<Boolean> condition, Supplier<CompletableFuture<T>> loopBody, CompletableFuture<T> result,
Executor executor) {
this.condition = condition;
this.loopBody = loopBody;
Expand All @@ -51,14 +56,31 @@ public void run() {
}

void execute() {
if (terminated) {
LOGGER.debug("Loop has terminated");
return;
}

if (this.condition.get()) {
this.loopBody.get()
.exceptionally(e -> {
if (e.getCause() instanceof ControllerException ex) {
if (ex.getErrorCode() != Code.INTERNAL_VALUE && ex.getErrorCode() != Code.MOCK_FAILURE_VALUE) {
terminated = true;
result.completeExceptionally(e);
throw new CompletionException(e);
}
}

LOGGER.error("Unexpected exception raised", e);
return null;
}).thenApply(t -> {
if (null != result && !result.isDone() && !result.isCancelled()) {
result.complete(t);
}
return t;
}).thenRunAsync(this, executor);
} else {
result.complete(null);
LOGGER.debug("Loop completed");
}
}
Expand Down

0 comments on commit bbc1808

Please sign in to comment.