Skip to content

Commit

Permalink
feat: retry completable future on exception (#599)
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored Nov 8, 2023
1 parent 635db0e commit 1f3f1e7
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.metadata.api.StoreMetadataService;
import com.automq.rocketmq.metadata.service.S3MetadataService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.tuple.Pair;

public class DefaultStoreMetadataService implements StoreMetadataService {
Expand Down Expand Up @@ -98,12 +100,26 @@ public CompletableFuture<Void> commitWalObject(S3WALObject walObject, List<S3Str
// The underlying storage layer does not know the current node id when constructing the WAL object.
// So we should fill it here.
S3WALObject newWal = S3WALObject.newBuilder(walObject).setBrokerId(metadataStore.config().nodeId()).build();
return s3MetadataService.commitWalObject(newWal, streamObjects, compactedObjects);
AtomicBoolean loop = new AtomicBoolean(true);
return Futures.loop(loop::get,
() -> s3MetadataService.commitWalObject(newWal, streamObjects, compactedObjects)
.thenApply(res -> {
loop.set(false);
return res;
}),
MoreExecutors.directExecutor());
}

@Override
public CompletableFuture<Void> commitStreamObject(S3StreamObject streamObject, List<Long> compactedObjects) {
return s3MetadataService.commitStreamObject(streamObject, compactedObjects);
AtomicBoolean loop = new AtomicBoolean(true);
return Futures.loop(loop::get,
() -> s3MetadataService.commitStreamObject(streamObject, compactedObjects)
.thenApply(res -> {
loop.set(false);
return res;
}),
MoreExecutors.directExecutor());
}

@Override
Expand Down
42 changes: 42 additions & 0 deletions metadata/src/main/java/com/automq/rocketmq/metadata/Futures.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.metadata;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Supplier;

public class Futures {
/**
* Executes a loop using CompletableFutures, without invoking join()/get() on any of them or exclusively hogging a thread.
*
* @param condition A Supplier that indicates whether to proceed with the loop or not.
* @param loopBody A Supplier that returns a CompletableFuture which represents the body of the loop. This
* supplier is invoked every time the loopBody needs to execute.
* @param executor An Executor that is used to execute the condition and the loop support code.
* @return A CompletableFuture that, when completed, indicates the loop terminated without any exception. If
* either the loopBody or condition throw/return Exceptions, these will be set as the result of this returned Future.
*/
public static CompletableFuture<Void> loop(Supplier<Boolean> condition, Supplier<CompletableFuture<Void>> loopBody,
Executor executor) {
CompletableFuture<Void> result = new CompletableFuture<>();
Loop<Void> loop = new Loop<>(condition, loopBody, result, executor);
executor.execute(loop);
return result;
}
}
65 changes: 65 additions & 0 deletions metadata/src/main/java/com/automq/rocketmq/metadata/Loop.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.metadata;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Loop<T> implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(Loop.class);

final Supplier<Boolean> condition;
final Supplier<CompletableFuture<T>> loopBody;

/**
* A CompletableFuture that will be completed, whether normally or exceptionally, when the loop completes.
*/
final CompletableFuture<Void> result;


final Executor executor;

public Loop(Supplier<Boolean> condition, Supplier<CompletableFuture<T>> loopBody, CompletableFuture<Void> result,
Executor executor) {
this.condition = condition;
this.loopBody = loopBody;
this.result = result;
this.executor = executor;
}

@Override
public void run() {
execute();
}

void execute() {
if (this.condition.get()) {
this.loopBody.get()
.exceptionally(e -> {
LOGGER.error("Unexpected exception raised", e);
return null;
}).thenRunAsync(this, executor);
} else {
result.complete(null);
LOGGER.debug("Loop completed");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package com.automq.rocketmq.metadata;

import apache.rocketmq.controller.v1.Code;
import apache.rocketmq.controller.v1.S3StreamObject;
import apache.rocketmq.controller.v1.S3WALObject;
import apache.rocketmq.controller.v1.StreamMetadata;
import apache.rocketmq.controller.v1.StreamRole;
import com.automq.rocketmq.common.config.ControllerConfig;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.metadata.api.StoreMetadataService;
import com.automq.rocketmq.metadata.service.S3MetadataService;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -57,7 +59,8 @@ public void testCommitWalObject() {
int nodeId = 100;
when(metadataStore.config()).thenReturn(config);
when(config.nodeId()).thenReturn(nodeId);

when(s3MetadataService.commitWalObject(ArgumentMatchers.any(), ArgumentMatchers.anyList(), ArgumentMatchers.anyList()))
.thenReturn(CompletableFuture.completedFuture(null));
service.commitWalObject(walObject, new ArrayList<>(), new ArrayList<>());
// Verify the arguments passed to metadataStore.commitWalObject().
S3WALObject newWal = S3WALObject.newBuilder(walObject).setBrokerId(nodeId).build();
Expand All @@ -71,7 +74,7 @@ public void testGetStreamId() {
.setStreamId(1L).build();
future.complete(metadata);
when(metadataStore.getStream(ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt(),
ArgumentMatchers.nullable(Long.class), ArgumentMatchers.eq(StreamRole.STREAM_ROLE_DATA)))
ArgumentMatchers.nullable(Long.class), ArgumentMatchers.eq(StreamRole.STREAM_ROLE_DATA)))
.thenReturn(future);

DefaultStoreMetadataService service = new DefaultStoreMetadataService(metadataStore, s3MetadataService);
Expand All @@ -83,7 +86,7 @@ public void testGetStreamId_throws() {
CompletableFuture<StreamMetadata> future = new CompletableFuture<>();
future.completeExceptionally(new ControllerException(Code.NOT_FOUND_VALUE, "not found"));
when(metadataStore.getStream(ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt(),
ArgumentMatchers.nullable(Long.class), ArgumentMatchers.eq(StreamRole.STREAM_ROLE_DATA)))
ArgumentMatchers.nullable(Long.class), ArgumentMatchers.eq(StreamRole.STREAM_ROLE_DATA)))
.thenReturn(future);
DefaultStoreMetadataService service = new DefaultStoreMetadataService(metadataStore, s3MetadataService);
CompletableFuture<StreamMetadata> streamCf = service.dataStreamOf(1L, 2);
Expand All @@ -99,7 +102,7 @@ public void testGetOperationLogStreamId() {
.setStreamId(1L).build();
future.complete(metadata);
when(metadataStore.getStream(ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt(),
ArgumentMatchers.nullable(Long.class), ArgumentMatchers.eq(StreamRole.STREAM_ROLE_OPS)))
ArgumentMatchers.nullable(Long.class), ArgumentMatchers.eq(StreamRole.STREAM_ROLE_OPS)))
.thenReturn(future);
DefaultStoreMetadataService service = new DefaultStoreMetadataService(metadataStore, s3MetadataService);
Assertions.assertEquals(1L, service.operationStreamOf(1L, 2).join().getStreamId());
Expand All @@ -110,7 +113,7 @@ public void testGetOperationLogStreamId_throws() {
CompletableFuture<StreamMetadata> future = new CompletableFuture<>();
future.completeExceptionally(new ControllerException(Code.NOT_FOUND_VALUE, "not found"));
when(metadataStore.getStream(ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt(),
ArgumentMatchers.nullable(Long.class), ArgumentMatchers.eq(StreamRole.STREAM_ROLE_OPS)))
ArgumentMatchers.nullable(Long.class), ArgumentMatchers.eq(StreamRole.STREAM_ROLE_OPS)))
.thenReturn(future);
DefaultStoreMetadataService service = new DefaultStoreMetadataService(metadataStore, s3MetadataService);
CompletableFuture<StreamMetadata> streamCf = service.operationStreamOf(1L, 2);
Expand All @@ -126,7 +129,7 @@ public void testGetRetryStreamId() {
.setStreamId(1L).build();
future.complete(metadata);
when(metadataStore.getStream(ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt(),
ArgumentMatchers.nullable(Long.class), ArgumentMatchers.eq(StreamRole.STREAM_ROLE_RETRY)))
ArgumentMatchers.nullable(Long.class), ArgumentMatchers.eq(StreamRole.STREAM_ROLE_RETRY)))
.thenReturn(future);
DefaultStoreMetadataService service = new DefaultStoreMetadataService(metadataStore, s3MetadataService);
Assertions.assertEquals(1L, service.retryStreamOf(3L, 1L, 2).join().getStreamId());
Expand All @@ -137,7 +140,7 @@ public void testGetRetryStreamId_throws() {
CompletableFuture<StreamMetadata> future = new CompletableFuture<>();
future.completeExceptionally(new ControllerException(Code.NOT_FOUND_VALUE, "not found"));
when(metadataStore.getStream(ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt(),
ArgumentMatchers.nullable(Long.class), ArgumentMatchers.eq(StreamRole.STREAM_ROLE_RETRY)))
ArgumentMatchers.nullable(Long.class), ArgumentMatchers.eq(StreamRole.STREAM_ROLE_RETRY)))
.thenReturn(future);
DefaultStoreMetadataService service = new DefaultStoreMetadataService(metadataStore, s3MetadataService);

Expand All @@ -147,4 +150,28 @@ public void testGetRetryStreamId_throws() {
Assertions.assertEquals(Code.NOT_FOUND_VALUE, exception.getErrorCode());
}

@Test
public void testRetryableCommitWalObject() {
when(metadataStore.config()).thenReturn(config);
when(config.nodeId()).thenReturn(1);
S3MetadataService metadataService = Mockito.mock(S3MetadataService.class);
Mockito.when(metadataService.commitWalObject(ArgumentMatchers.any(), ArgumentMatchers.anyList(), ArgumentMatchers.anyList()))
.thenReturn(CompletableFuture.failedFuture(new ControllerException(Code.MOCK_FAILURE_VALUE, "Mocked Failure")))
.thenReturn(CompletableFuture.completedFuture(null));
StoreMetadataService svc = new DefaultStoreMetadataService(metadataStore, metadataService);
Assertions.assertDoesNotThrow(() -> svc.commitWalObject(S3WALObject.newBuilder().build(),
new ArrayList<>(), new ArrayList<>()).join());
}

@Test
public void testRetryableCommitStreamObject() {
S3MetadataService metadataService = Mockito.mock(S3MetadataService.class);
Mockito.when(metadataService.commitStreamObject(ArgumentMatchers.any(), ArgumentMatchers.anyList()))
.thenReturn(CompletableFuture.failedFuture(new ControllerException(Code.MOCK_FAILURE_VALUE, "Mocked Failure")))
.thenReturn(CompletableFuture.completedFuture(null));
StoreMetadataService svc = new DefaultStoreMetadataService(metadataStore, metadataService);
Assertions.assertDoesNotThrow(() -> svc.commitStreamObject(S3StreamObject.newBuilder().build(),
new ArrayList<>()).join());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public CompletableFuture<CommitSSTObjectResponse> commitSSTObject(CommitSSTObjec
walObject, streamObjects, request.getCompactedObjectIds());

// Build compacted objects
return metaService.commitWalObject(walObject, streamObjects, request.getCompactedObjectIds()).thenApply(resp -> new CommitSSTObjectResponse());
return metaService.commitWalObject(walObject, streamObjects, request.getCompactedObjectIds())
.thenApply(resp -> new CommitSSTObjectResponse());
}

@Override
Expand Down

0 comments on commit 1f3f1e7

Please sign in to comment.