diff --git a/controller/src/main/java/com/automq/rocketmq/controller/ControllerServiceImpl.java b/controller/src/main/java/com/automq/rocketmq/controller/ControllerServiceImpl.java index 2282fc052..b42d99aee 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/ControllerServiceImpl.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/ControllerServiceImpl.java @@ -319,26 +319,16 @@ public void deleteTopic(DeleteTopicRequest request, StreamObserver responseObserver) { - try { - metadataStore.reassignMessageQueue(request.getQueue().getTopicId(), request.getQueue().getQueueId(), request.getDstNodeId()).whenComplete((res, e) -> { - if (null != e) { - responseObserver.onError(e); - } else { - ReassignMessageQueueReply reply = ReassignMessageQueueReply.newBuilder() - .setStatus(Status.newBuilder().setCode(Code.OK).build()).build(); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } - }); - } catch (ControllerException e) { - ReassignMessageQueueReply reply = ReassignMessageQueueReply.newBuilder() - .setStatus(Status.newBuilder() - .setCode(Code.forNumber(e.getErrorCode())) - .setMessage(e.getMessage()).build()) - .build(); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } + metadataStore.reassignMessageQueue(request.getQueue().getTopicId(), request.getQueue().getQueueId(), request.getDstNodeId()).whenComplete((res, e) -> { + if (null != e) { + responseObserver.onError(e); + } else { + ReassignMessageQueueReply reply = ReassignMessageQueueReply.newBuilder() + .setStatus(Status.newBuilder().setCode(Code.OK).build()).build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + }); } @Override @@ -545,8 +535,8 @@ public void closeStream(CloseStreamRequest request, StreamObserver responseObserver) { - try { - metadataStore.trimStream(request.getStreamId(), request.getStreamEpoch(), request.getNewStartOffset()).whenComplete((res, e) -> { + metadataStore.trimStream(request.getStreamId(), request.getStreamEpoch(), request.getNewStartOffset()) + .whenComplete((res, e) -> { if (null != e) { responseObserver.onError(e); } else { @@ -557,15 +547,6 @@ public void trimStream(TrimStreamRequest request, StreamObserver notifyQueueClose(String target, long topicId, - int queueId) throws ControllerException { + public CompletableFuture notifyQueueClose(String target, long topicId, int queueId) { ControllerServiceGrpc.ControllerServiceFutureStub stub; try { diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/MetadataStore.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/MetadataStore.java index 4d43dd3d7..b37693d05 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/MetadataStore.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/MetadataStore.java @@ -141,7 +141,7 @@ public interface MetadataStore extends Closeable { CompletableFuture> listAssignments(Long topicId, Integer srcNodeId, Integer dstNodeId, AssignmentStatus status); - CompletableFuture reassignMessageQueue(long topicId, int queueId, int dstNodeId) throws ControllerException; + CompletableFuture reassignMessageQueue(long topicId, int queueId, int dstNodeId); CompletableFuture markMessageQueueAssignable(long topicId, int queueId); @@ -169,7 +169,7 @@ CompletableFuture> listAssignments(Long topicId, Integer s */ CompletableFuture onQueueClosed(long topicId, int queueId); - CompletableFuture trimStream(long streamId, long streamEpoch, long newStartOffset) throws ControllerException; + CompletableFuture trimStream(long streamId, long streamEpoch, long newStartOffset); CompletableFuture openStream(long streamId, long streamEpoch, int nodeId); diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStore.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStore.java index 95995bc80..2da0f01e7 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStore.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStore.java @@ -222,11 +222,11 @@ public CompletableFuture describeCluster(DescribeClusterRequest request .setExpirationTimestamp(toTimestamp(lease.getExpirationTime())).build()); builder.setSummary(ClusterSummary.newBuilder() - .setNodeQuantity(nodes.size()) - .setTopicQuantity(topicManager.topicQuantity()) - .setQueueQuantity(topicManager.queueQuantity()) - .setStreamQuantity(topicManager.streamQuantity()) - .setGroupQuantity(groupManager.groupCache.groupQuantity()) + .setNodeQuantity(nodes.size()) + .setTopicQuantity(topicManager.topicQuantity()) + .setQueueQuantity(topicManager.queueQuantity()) + .setStreamQuantity(topicManager.streamQuantity()) + .setGroupQuantity(groupManager.groupCache.groupQuantity()) .build()); for (Map.Entry entry : nodes.entrySet()) { @@ -475,8 +475,7 @@ public CompletableFuture> listAssignments(Long topicId, In } @Override - public CompletableFuture reassignMessageQueue(long topicId, int queueId, - int dstNodeId) { + public CompletableFuture reassignMessageQueue(long topicId, int queueId, int dstNodeId) { CompletableFuture future = new CompletableFuture<>(); for (; ; ) { if (isLeader()) { @@ -1161,8 +1160,7 @@ public void applyStreamChange(List streams) { } @Override - public CompletableFuture trimStream(long streamId, long streamEpoch, - long newStartOffset) throws ControllerException { + public CompletableFuture trimStream(long streamId, long streamEpoch, long newStartOffset) { return s3MetadataManager.trimStream(streamId, streamEpoch, newStartOffset); } diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/S3MetadataManager.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/S3MetadataManager.java index 666c356ba..5a2626e40 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/S3MetadataManager.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/S3MetadataManager.java @@ -618,8 +618,7 @@ public CompletableFuture, List>> listObje } - public CompletableFuture trimStream(long streamId, long streamEpoch, - long newStartOffset) throws ControllerException { + public CompletableFuture trimStream(long streamId, long streamEpoch, long newStartOffset) { CompletableFuture future = new CompletableFuture<>(); for (; ; ) { if (metadataStore.isLeader()) { diff --git a/metadata/src/main/java/com/automq/rocketmq/metadata/DefaultStoreMetadataService.java b/metadata/src/main/java/com/automq/rocketmq/metadata/DefaultStoreMetadataService.java index 3c8f16f88..dfcef8b15 100644 --- a/metadata/src/main/java/com/automq/rocketmq/metadata/DefaultStoreMetadataService.java +++ b/metadata/src/main/java/com/automq/rocketmq/metadata/DefaultStoreMetadataService.java @@ -70,14 +70,8 @@ public CompletableFuture maxDeliveryAttemptsOf(long consumerGroupId) { } @Override - public CompletableFuture trimStream(long streamId, long streamEpoch, - long newStartOffset) { - try { - return metadataStore.trimStream(streamId, streamEpoch, newStartOffset); - } catch (ControllerException e) { - LOGGER.error("Exception raised while trim stream for {}, {}, {}", streamId, streamEpoch, newStartOffset, e); - return null; - } + public CompletableFuture trimStream(long streamId, long streamEpoch, long newStartOffset) { + return metadataStore.trimStream(streamId, streamEpoch, newStartOffset); } @Override