diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/v2/consumer/ExtendReceiveMessageResponseStreamWriter.java b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/v2/consumer/ExtendReceiveMessageResponseStreamWriter.java index e72e0f81f..5da8045c0 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/v2/consumer/ExtendReceiveMessageResponseStreamWriter.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/v2/consumer/ExtendReceiveMessageResponseStreamWriter.java @@ -52,6 +52,7 @@ public void writeAndComplete(ProxyContext ctx, ReceiveMessageRequest request, Po } else { recordRpcLatency(ctx, Code.OK); } + break; case POLLING_FULL: recordRpcLatency(ctx, Code.TOO_MANY_REQUESTS); break; diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/service/SuspendRequestService.java b/proxy/src/main/java/com/automq/rocketmq/proxy/service/SuspendRequestService.java index 68be89776..f82539a90 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/service/SuspendRequestService.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/service/SuspendRequestService.java @@ -143,9 +143,15 @@ public CompletableFuture tryFetchMessages() { completed.set(true); return true; } - inflight.set(false); return false; - }); + }) + .exceptionally(ex -> { + LOGGER.error("Error while fetching messages for suspended request.", ex); + future.completeExceptionally(ex); + completed.set(true); + return true; + }) + .whenComplete((result, ex) -> inflight.set(false)); } return CompletableFuture.completedFuture(false); } @@ -171,6 +177,7 @@ public void notifyMessageArrival(String topic, int queueId, String tag) { suspendRequestCount.decrementAndGet(); } })); + break; } } }