Skip to content

Commit

Permalink
fix: refine api of metadata store (#490)
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored Oct 27, 2023
1 parent db1af30 commit 3000b7a
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -319,26 +319,16 @@ public void deleteTopic(DeleteTopicRequest request, StreamObserver<DeleteTopicRe
@Override
public void reassignMessageQueue(ReassignMessageQueueRequest request,
StreamObserver<ReassignMessageQueueReply> responseObserver) {
try {
metadataStore.reassignMessageQueue(request.getQueue().getTopicId(), request.getQueue().getQueueId(), request.getDstNodeId()).whenComplete((res, e) -> {
if (null != e) {
responseObserver.onError(e);
} else {
ReassignMessageQueueReply reply = ReassignMessageQueueReply.newBuilder()
.setStatus(Status.newBuilder().setCode(Code.OK).build()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
});
} catch (ControllerException e) {
ReassignMessageQueueReply reply = ReassignMessageQueueReply.newBuilder()
.setStatus(Status.newBuilder()
.setCode(Code.forNumber(e.getErrorCode()))
.setMessage(e.getMessage()).build())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
metadataStore.reassignMessageQueue(request.getQueue().getTopicId(), request.getQueue().getQueueId(), request.getDstNodeId()).whenComplete((res, e) -> {
if (null != e) {
responseObserver.onError(e);
} else {
ReassignMessageQueueReply reply = ReassignMessageQueueReply.newBuilder()
.setStatus(Status.newBuilder().setCode(Code.OK).build()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
});
}

@Override
Expand Down Expand Up @@ -545,8 +535,8 @@ public void closeStream(CloseStreamRequest request, StreamObserver<CloseStreamRe

@Override
public void trimStream(TrimStreamRequest request, StreamObserver<TrimStreamReply> responseObserver) {
try {
metadataStore.trimStream(request.getStreamId(), request.getStreamEpoch(), request.getNewStartOffset()).whenComplete((res, e) -> {
metadataStore.trimStream(request.getStreamId(), request.getStreamEpoch(), request.getNewStartOffset())
.whenComplete((res, e) -> {
if (null != e) {
responseObserver.onError(e);
} else {
Expand All @@ -557,15 +547,6 @@ public void trimStream(TrimStreamRequest request, StreamObserver<TrimStreamReply
responseObserver.onCompleted();
}
});
} catch (ControllerException e) {
TrimStreamReply reply = TrimStreamReply.newBuilder()
.setStatus(Status.newBuilder()
.setCode(Code.forNumber(e.getErrorCode()))
.setMessage(e.getMessage()).build())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,7 @@ public void onFailure(@Nonnull Throwable t) {
}

@Override
public CompletableFuture<Void> notifyQueueClose(String target, long topicId,
int queueId) throws ControllerException {
public CompletableFuture<Void> notifyQueueClose(String target, long topicId, int queueId) {

ControllerServiceGrpc.ControllerServiceFutureStub stub;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public interface MetadataStore extends Closeable {
CompletableFuture<List<QueueAssignment>> listAssignments(Long topicId, Integer srcNodeId, Integer dstNodeId,
AssignmentStatus status);

CompletableFuture<Void> reassignMessageQueue(long topicId, int queueId, int dstNodeId) throws ControllerException;
CompletableFuture<Void> reassignMessageQueue(long topicId, int queueId, int dstNodeId);

CompletableFuture<Void> markMessageQueueAssignable(long topicId, int queueId);

Expand Down Expand Up @@ -169,7 +169,7 @@ CompletableFuture<List<QueueAssignment>> listAssignments(Long topicId, Integer s
*/
CompletableFuture<Void> onQueueClosed(long topicId, int queueId);

CompletableFuture<Void> trimStream(long streamId, long streamEpoch, long newStartOffset) throws ControllerException;
CompletableFuture<Void> trimStream(long streamId, long streamEpoch, long newStartOffset);

CompletableFuture<StreamMetadata> openStream(long streamId, long streamEpoch, int nodeId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,11 @@ public CompletableFuture<Cluster> describeCluster(DescribeClusterRequest request
.setExpirationTimestamp(toTimestamp(lease.getExpirationTime())).build());

builder.setSummary(ClusterSummary.newBuilder()
.setNodeQuantity(nodes.size())
.setTopicQuantity(topicManager.topicQuantity())
.setQueueQuantity(topicManager.queueQuantity())
.setStreamQuantity(topicManager.streamQuantity())
.setGroupQuantity(groupManager.groupCache.groupQuantity())
.setNodeQuantity(nodes.size())
.setTopicQuantity(topicManager.topicQuantity())
.setQueueQuantity(topicManager.queueQuantity())
.setStreamQuantity(topicManager.streamQuantity())
.setGroupQuantity(groupManager.groupCache.groupQuantity())
.build());

for (Map.Entry<Integer, BrokerNode> entry : nodes.entrySet()) {
Expand Down Expand Up @@ -475,8 +475,7 @@ public CompletableFuture<List<QueueAssignment>> listAssignments(Long topicId, In
}

@Override
public CompletableFuture<Void> reassignMessageQueue(long topicId, int queueId,
int dstNodeId) {
public CompletableFuture<Void> reassignMessageQueue(long topicId, int queueId, int dstNodeId) {
CompletableFuture<Void> future = new CompletableFuture<>();
for (; ; ) {
if (isLeader()) {
Expand Down Expand Up @@ -1161,8 +1160,7 @@ public void applyStreamChange(List<Stream> streams) {
}

@Override
public CompletableFuture<Void> trimStream(long streamId, long streamEpoch,
long newStartOffset) throws ControllerException {
public CompletableFuture<Void> trimStream(long streamId, long streamEpoch, long newStartOffset) {
return s3MetadataManager.trimStream(streamId, streamEpoch, newStartOffset);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,8 +618,7 @@ public CompletableFuture<Pair<List<S3StreamObject>, List<S3WALObject>>> listObje
}


public CompletableFuture<Void> trimStream(long streamId, long streamEpoch,
long newStartOffset) throws ControllerException {
public CompletableFuture<Void> trimStream(long streamId, long streamEpoch, long newStartOffset) {
CompletableFuture<Void> future = new CompletableFuture<>();
for (; ; ) {
if (metadataStore.isLeader()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,8 @@ public CompletableFuture<Integer> maxDeliveryAttemptsOf(long consumerGroupId) {
}

@Override
public CompletableFuture<Void> trimStream(long streamId, long streamEpoch,
long newStartOffset) {
try {
return metadataStore.trimStream(streamId, streamEpoch, newStartOffset);
} catch (ControllerException e) {
LOGGER.error("Exception raised while trim stream for {}, {}, {}", streamId, streamEpoch, newStartOffset, e);
return null;
}
public CompletableFuture<Void> trimStream(long streamId, long streamEpoch, long newStartOffset) {
return metadataStore.trimStream(streamId, streamEpoch, newStartOffset);
}

@Override
Expand Down

0 comments on commit 3000b7a

Please sign in to comment.