From bbc18084531b7896f8a40db305164d7c2df87021 Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Wed, 8 Nov 2023 16:33:15 +0800 Subject: [PATCH] feat: retry invocations with metadata that are caused by internal error or mocked (#600) Signed-off-by: Li Zhanhui --- .../metadata/DefaultStoreMetadataService.java | 106 +++++++++++++++--- .../com/automq/rocketmq/metadata/Futures.java | 6 +- .../com/automq/rocketmq/metadata/Loop.java | 32 +++++- 3 files changed, 121 insertions(+), 23 deletions(-) diff --git a/metadata/src/main/java/com/automq/rocketmq/metadata/DefaultStoreMetadataService.java b/metadata/src/main/java/com/automq/rocketmq/metadata/DefaultStoreMetadataService.java index 96e3a47fd..3c7ac46bd 100644 --- a/metadata/src/main/java/com/automq/rocketmq/metadata/DefaultStoreMetadataService.java +++ b/metadata/src/main/java/com/automq/rocketmq/metadata/DefaultStoreMetadataService.java @@ -46,52 +46,102 @@ public DefaultStoreMetadataService(MetadataStore metadataStore, S3MetadataServic @Override public CompletableFuture dataStreamOf(long topicId, int queueId) { - return metadataStore.getStream(topicId, queueId, null, StreamRole.STREAM_ROLE_DATA); + AtomicBoolean loop = new AtomicBoolean(true); + return Futures.loop(loop::get, () -> metadataStore.getStream(topicId, queueId, null, StreamRole.STREAM_ROLE_DATA) + .thenApply(res -> { + loop.set(false); + return res; + }), MoreExecutors.directExecutor()); } @Override public CompletableFuture operationStreamOf(long topicId, int queueId) { - return metadataStore.getStream(topicId, queueId, null, StreamRole.STREAM_ROLE_OPS); + AtomicBoolean loop = new AtomicBoolean(true); + return Futures.loop(loop::get, () -> metadataStore.getStream(topicId, queueId, null, StreamRole.STREAM_ROLE_OPS) + .thenApply(res -> { + loop.set(false); + return res; + }), MoreExecutors.directExecutor()); } @Override public CompletableFuture snapshotStreamOf(long topicId, int queueId) { - return metadataStore.getStream(topicId, queueId, null, StreamRole.STREAM_ROLE_SNAPSHOT); + AtomicBoolean loop = new AtomicBoolean(true); + return Futures.loop(loop::get, () -> metadataStore.getStream(topicId, queueId, null, StreamRole.STREAM_ROLE_SNAPSHOT) + .thenApply(res -> { + loop.set(false); + return res; + }), MoreExecutors.directExecutor()); } @Override public CompletableFuture retryStreamOf(long consumerGroupId, long topicId, int queueId) { - return metadataStore.getStream(topicId, queueId, consumerGroupId, StreamRole.STREAM_ROLE_RETRY); + AtomicBoolean loop = new AtomicBoolean(true); + return Futures.loop(loop::get, () -> metadataStore.getStream(topicId, queueId, consumerGroupId, StreamRole.STREAM_ROLE_RETRY) + .thenApply(res -> { + loop.set(false); + return res; + }), MoreExecutors.directExecutor()); } @Override public CompletableFuture maxDeliveryAttemptsOf(long consumerGroupId) { - return metadataStore.describeGroup(consumerGroupId, null).thenApply((ConsumerGroup::getMaxDeliveryAttempt)); + AtomicBoolean loop = new AtomicBoolean(true); + return Futures.loop(loop::get, () -> metadataStore.describeGroup(consumerGroupId, null).thenApply((ConsumerGroup::getMaxDeliveryAttempt)) + .thenApply(res -> { + loop.set(false); + return res; + }), MoreExecutors.directExecutor()); } @Override public CompletableFuture trimStream(long streamId, long streamEpoch, long newStartOffset) { - return s3MetadataService.trimStream(streamId, streamEpoch, newStartOffset); + AtomicBoolean loop = new AtomicBoolean(true); + return Futures.loop(loop::get, () -> s3MetadataService.trimStream(streamId, streamEpoch, newStartOffset) + .thenApply(res -> { + loop.set(false); + return res; + }), MoreExecutors.directExecutor()); } @Override public CompletableFuture openStream(long streamId, long streamEpoch) { - return metadataStore.openStream(streamId, streamEpoch, metadataStore.config().nodeId()); + AtomicBoolean loop = new AtomicBoolean(true); + return Futures.loop(loop::get, () -> metadataStore.openStream(streamId, streamEpoch, metadataStore.config().nodeId()) + .thenApply(res -> { + loop.set(false); + return res; + }), MoreExecutors.directExecutor()); } @Override public CompletableFuture closeStream(long streamId, long streamEpoch) { - return metadataStore.closeStream(streamId, streamEpoch, metadataStore.config().nodeId()); + AtomicBoolean loop = new AtomicBoolean(true); + return Futures.loop(loop::get, () -> metadataStore.closeStream(streamId, streamEpoch, metadataStore.config().nodeId()) + .thenApply(res -> { + loop.set(false); + return res; + }), MoreExecutors.directExecutor()); } @Override public CompletableFuture> listOpenStreams() { - return metadataStore.listOpenStreams(metadataStore.config().nodeId()); + AtomicBoolean loop = new AtomicBoolean(true); + return Futures.loop(loop::get, () -> metadataStore.listOpenStreams(metadataStore.config().nodeId()) + .thenApply(res -> { + loop.set(false); + return res; + }), MoreExecutors.directExecutor()); } @Override public CompletableFuture prepareS3Objects(int count, int ttlInMinutes) { - return s3MetadataService.prepareS3Objects(count, ttlInMinutes); + AtomicBoolean loop = new AtomicBoolean(true); + return Futures.loop(loop::get, () -> s3MetadataService.prepareS3Objects(count, ttlInMinutes) + .thenApply(res -> { + loop.set(false); + return res; + }), MoreExecutors.directExecutor()); } @Override @@ -124,25 +174,45 @@ public CompletableFuture commitStreamObject(S3StreamObject streamObject, L @Override public CompletableFuture> listWALObjects() { - return s3MetadataService.listWALObjects(); + AtomicBoolean loop = new AtomicBoolean(true); + return Futures.loop(loop::get, () -> s3MetadataService.listWALObjects() + .thenApply(res -> { + loop.set(false); + return res; + }), MoreExecutors.directExecutor()); } @Override public CompletableFuture> listWALObjects(long streamId, long startOffset, long endOffset, int limit) { - return s3MetadataService.listWALObjects(streamId, startOffset, endOffset, limit); + AtomicBoolean loop = new AtomicBoolean(true); + return Futures.loop(loop::get, () -> s3MetadataService.listWALObjects(streamId, startOffset, endOffset, limit) + .thenApply(res -> { + loop.set(false); + return res; + }), MoreExecutors.directExecutor()); } @Override public CompletableFuture> listStreamObjects(long streamId, long startOffset, long endOffset, int limit) { - return s3MetadataService.listStreamObjects(streamId, startOffset, endOffset, limit); + AtomicBoolean loop = new AtomicBoolean(true); + return Futures.loop(loop::get, () -> s3MetadataService.listStreamObjects(streamId, startOffset, endOffset, limit) + .thenApply(res -> { + loop.set(false); + return res; + }), MoreExecutors.directExecutor()); } @Override public CompletableFuture, List>> listObjects(long streamId, long startOffset, long endOffset, int limit) { - return s3MetadataService.listObjects(streamId, startOffset, endOffset, limit); + AtomicBoolean loop = new AtomicBoolean(true); + return Futures.loop(loop::get, () -> s3MetadataService.listObjects(streamId, startOffset, endOffset, limit) + .thenApply(res -> { + loop.set(false); + return res; + }), MoreExecutors.directExecutor()); } @Override @@ -160,6 +230,12 @@ public CompletableFuture> getStreams(List streamIds) if (streamIds == null || streamIds.isEmpty()) { return CompletableFuture.completedFuture(List.of()); } - return metadataStore.getStreams(streamIds); + + AtomicBoolean loop = new AtomicBoolean(true); + return Futures.loop(loop::get, () -> metadataStore.getStreams(streamIds) + .thenApply(res -> { + loop.set(false); + return res; + }), MoreExecutors.directExecutor()); } } diff --git a/metadata/src/main/java/com/automq/rocketmq/metadata/Futures.java b/metadata/src/main/java/com/automq/rocketmq/metadata/Futures.java index ed82cd901..c158be80b 100644 --- a/metadata/src/main/java/com/automq/rocketmq/metadata/Futures.java +++ b/metadata/src/main/java/com/automq/rocketmq/metadata/Futures.java @@ -32,10 +32,10 @@ public class Futures { * @return A CompletableFuture that, when completed, indicates the loop terminated without any exception. If * either the loopBody or condition throw/return Exceptions, these will be set as the result of this returned Future. */ - public static CompletableFuture loop(Supplier condition, Supplier> loopBody, + public static CompletableFuture loop(Supplier condition, Supplier> loopBody, Executor executor) { - CompletableFuture result = new CompletableFuture<>(); - Loop loop = new Loop<>(condition, loopBody, result, executor); + CompletableFuture result = new CompletableFuture<>(); + Loop loop = new Loop<>(condition, loopBody, result, executor); executor.execute(loop); return result; } diff --git a/metadata/src/main/java/com/automq/rocketmq/metadata/Loop.java b/metadata/src/main/java/com/automq/rocketmq/metadata/Loop.java index 3634e90a0..51ecb8e94 100644 --- a/metadata/src/main/java/com/automq/rocketmq/metadata/Loop.java +++ b/metadata/src/main/java/com/automq/rocketmq/metadata/Loop.java @@ -17,7 +17,10 @@ package com.automq.rocketmq.metadata; +import apache.rocketmq.controller.v1.Code; +import com.automq.rocketmq.common.exception.ControllerException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.function.Supplier; import org.slf4j.Logger; @@ -27,17 +30,19 @@ public class Loop implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(Loop.class); final Supplier condition; + final Supplier> loopBody; /** - * A CompletableFuture that will be completed, whether normally or exceptionally, when the loop completes. + * A CompletableFuture that will be completed when the loop completes. */ - final CompletableFuture result; - + final CompletableFuture result; final Executor executor; - public Loop(Supplier condition, Supplier> loopBody, CompletableFuture result, + private boolean terminated; + + public Loop(Supplier condition, Supplier> loopBody, CompletableFuture result, Executor executor) { this.condition = condition; this.loopBody = loopBody; @@ -51,14 +56,31 @@ public void run() { } void execute() { + if (terminated) { + LOGGER.debug("Loop has terminated"); + return; + } + if (this.condition.get()) { this.loopBody.get() .exceptionally(e -> { + if (e.getCause() instanceof ControllerException ex) { + if (ex.getErrorCode() != Code.INTERNAL_VALUE && ex.getErrorCode() != Code.MOCK_FAILURE_VALUE) { + terminated = true; + result.completeExceptionally(e); + throw new CompletionException(e); + } + } + LOGGER.error("Unexpected exception raised", e); return null; + }).thenApply(t -> { + if (null != result && !result.isDone() && !result.isCancelled()) { + result.complete(t); + } + return t; }).thenRunAsync(this, executor); } else { - result.complete(null); LOGGER.debug("Loop completed"); } }