Skip to content

Commit

Permalink
chore(proxy): add global exception handler for proxy (#537)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Nov 2, 2023
1 parent 9f166e0 commit 90cc1cf
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

public class RocketMQException extends Exception {

private int errorCode;
private final int errorCode;

public RocketMQException(int errorCode) {
this.errorCode = errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package com.automq.rocketmq.common.exception;

public class RocketMQRuntimeException extends RuntimeException {
private int errorCode;
protected final int errorCode;

public RocketMQRuntimeException(int errorCode) {
this.errorCode = errorCode;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.proxy.exception;

import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.Status;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.ResponseCode;

public class ExceptionHandler {
private static final Map<Code, Integer> PROXY_EXCEPTION_RESPONSE_CODE_MAP = new HashMap<>() {
{
put(Code.FORBIDDEN, ResponseCode.NO_PERMISSION);
put(Code.TOPIC_NOT_FOUND, ResponseCode.TOPIC_NOT_EXIST);
put(Code.CONSUMER_GROUP_NOT_FOUND, ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
put(Code.MESSAGE_NOT_FOUND, ResponseCode.NO_MESSAGE);
put(Code.MESSAGE_PROPERTY_CONFLICT_WITH_TYPE, ResponseCode.MESSAGE_ILLEGAL);
put(Code.INTERNAL_ERROR, ResponseCode.SYSTEM_ERROR);
put(Code.INTERNAL_SERVER_ERROR, ResponseCode.SYSTEM_ERROR);
}
};

public static Optional<RemotingCommand> convertToRemotingResponse(Throwable throwable) {
if (throwable instanceof ProxyException proxyException) {
Integer responseCode = PROXY_EXCEPTION_RESPONSE_CODE_MAP.getOrDefault(proxyException.getErrorCode(), ResponseCode.SYSTEM_ERROR);
RemotingCommand response = RemotingCommand.buildErrorResponse(responseCode, proxyException.getMessage());
return Optional.of(response);
}
return Optional.empty();
}

public static Optional<Status> convertToGrpcStatus(Throwable throwable) {
if (throwable instanceof ProxyException proxyException) {
return Optional.of(
Status.newBuilder()
.setCode(proxyException.getErrorCode())
.setMessage(proxyException.getMessage())
.build()
);
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.proxy.exception;

import apache.rocketmq.v2.Code;
import com.automq.rocketmq.common.exception.RocketMQRuntimeException;

public class ProxyException extends RocketMQRuntimeException {
public ProxyException(Code errorCode) {
super(errorCode.getNumber());
}

public ProxyException(Code errorCode, String message) {
super(errorCode.getNumber(), message);
}

public ProxyException(Code errorCode, String message, Throwable cause) {
super(errorCode.getNumber(), message, cause);
}

public ProxyException(Code errorCode, Throwable cause) {
super(errorCode.getNumber(), cause);
}

public ProxyException(Code errorCode, String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
super(errorCode.getNumber(), message, cause, enableSuppression, writableStackTrace);
}

public Code getErrorCode() {
return Code.forNumber(errorCode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.Status;
import com.automq.rocketmq.proxy.exception.ExceptionHandler;
import com.automq.rocketmq.proxy.metrics.ProxyMetricsManager;
import com.automq.rocketmq.proxy.model.ProxyContextExt;
import io.grpc.stub.StreamObserver;
import java.lang.reflect.Method;
import java.util.Optional;
import java.util.function.Function;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication;
import org.apache.rocketmq.proxy.grpc.v2.GrpcMessingActivity;
import org.apache.rocketmq.proxy.grpc.v2.common.ResponseWriter;

public class ExtendGrpcMessagingApplication extends GrpcMessagingApplication {
public ExtendGrpcMessagingApplication(GrpcMessingActivity grpcMessingActivity) {
Expand All @@ -45,6 +48,10 @@ protected ProxyContext createContext() {
}

private <T> String getResponseStatus(T response) {
if (response == null) {
return "unknown";
}

if (response instanceof SendMessageResponse detailResponse) {
return detailResponse.getStatus().getCode().name().toLowerCase();
} else if (response instanceof ReceiveMessageResponse detailResponse) {
Expand Down Expand Up @@ -73,8 +80,22 @@ private <T> String getResponseStatus(T response) {
@Override
protected <V, T> void writeResponse(ProxyContext context, V request, T response, StreamObserver<T> responseObserver,
Throwable t, Function<Status, T> errorResponseCreator) {
if (t != null) {
Optional<Status> status = ExceptionHandler.convertToGrpcStatus(t);
if (status.isPresent()) {
ProxyMetricsManager.recordRpcLatency(context.getProtocolType(), context.getAction(),
status.get().getCode().name().toLowerCase(), ((ProxyContextExt) context).getElapsedTimeNanos());
ResponseWriter.getInstance().write(
responseObserver,
errorResponseCreator.apply(status.get())
);
return;
}
}

ProxyMetricsManager.recordRpcLatency(context.getProtocolType(), context.getAction(),
getResponseStatus(response), ((ProxyContextExt) context).getElapsedTimeNanos());

super.writeResponse(context, request, response, responseObserver, t, errorResponseCreator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.automq.rocketmq.proxy.remoting.activity;

import com.automq.rocketmq.proxy.exception.ExceptionHandler;
import com.automq.rocketmq.proxy.remoting.RemotingUtil;
import io.netty.channel.ChannelHandlerContext;
import java.util.Optional;
Expand Down Expand Up @@ -285,4 +286,16 @@ private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, Remoting
});
return null;
}

@Override
protected void writeErrResponse(ChannelHandlerContext ctx, ProxyContext context, RemotingCommand request,
Throwable t) {
Optional<RemotingCommand> response = ExceptionHandler.convertToRemotingResponse(t);
if (response.isPresent()) {
writeResponse(ctx, context, request, response.get(), t);
return;
}

super.writeErrResponse(ctx, context, request, t);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.automq.rocketmq.proxy.remoting.activity;

import com.automq.rocketmq.proxy.exception.ExceptionHandler;
import com.automq.rocketmq.proxy.remoting.RemotingUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion;
Expand Down Expand Up @@ -244,4 +245,16 @@ private RemotingCommand pullMessage(ChannelHandlerContext ctx, RemotingCommand r

return null;
}

@Override
protected void writeErrResponse(ChannelHandlerContext ctx, ProxyContext context, RemotingCommand request,
Throwable t) {
Optional<RemotingCommand> response = ExceptionHandler.convertToRemotingResponse(t);
if (response.isPresent()) {
writeResponse(ctx, context, request, response.get(), t);
return;
}

super.writeErrResponse(ctx, context, request, t);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.automq.rocketmq.proxy.remoting.activity;

import com.automq.rocketmq.proxy.exception.ExceptionHandler;
import com.automq.rocketmq.proxy.processor.ExtendMessagingProcessor;
import com.automq.rocketmq.proxy.remoting.RemotingUtil;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -236,4 +237,16 @@ public AddressableMessageQueue select(ProxyContext ctx, MessageQueueView message
return new AddressableMessageQueue(messageQueue, null);
}
}

@Override
protected void writeErrResponse(ChannelHandlerContext ctx, ProxyContext context, RemotingCommand request,
Throwable t) {
Optional<RemotingCommand> response = ExceptionHandler.convertToRemotingResponse(t);
if (response.isPresent()) {
writeResponse(ctx, context, request, response.get(), t);
return;
}

super.writeErrResponse(ctx, context, request, t);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.automq.rocketmq.common.util.CommonUtil;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.metadata.api.ProxyMetadataService;
import com.automq.rocketmq.proxy.exception.ProxyException;
import com.automq.rocketmq.proxy.metrics.ProxyMetricsManager;
import com.automq.rocketmq.proxy.model.VirtualQueue;
import com.automq.rocketmq.proxy.util.FlatMessageUtil;
Expand Down Expand Up @@ -58,7 +59,6 @@
import org.apache.rocketmq.client.consumer.PopStatus;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.MixAll;
Expand Down Expand Up @@ -144,7 +144,7 @@ public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, Address
if (topic.getTopicId() != virtualQueue.topicId()) {
LOGGER.error("Topic id in request header {} does not match topic id in message queue {}, maybe the topic is recreated.",
topic.getTopicId(), virtualQueue.topicId());
return CompletableFuture.failedFuture(new MQBrokerException(ResponseCode.TOPIC_NOT_EXIST, "Topic not exist"));
return CompletableFuture.failedFuture(new ProxyException(apache.rocketmq.v2.Code.TOPIC_NOT_FOUND, "Topic resource does not exist."));
}
FlatMessage flatMessage = FlatMessageUtil.convertTo(topic.getTopicId(), virtualQueue.physicalQueueId(), config.hostName(), message);

Expand Down Expand Up @@ -210,7 +210,7 @@ public CompletableFuture<RemotingCommand> sendMessageBack(ProxyContext ctx, Rece
if (topic.getTopicId() != virtualQueue.topicId()) {
LOGGER.error("Topic id in request header {} does not match topic id in message queue {}, maybe the topic is recreated.",
topic.getTopicId(), virtualQueue.topicId());
throw new CompletionException(new MQBrokerException(ResponseCode.TOPIC_NOT_EXIST, "Topic not exist"));
throw new ProxyException(apache.rocketmq.v2.Code.TOPIC_NOT_FOUND, "Topic resource does not exist.");
}
return Pair.of(topic, group);
}).thenCompose(pair -> {
Expand All @@ -223,7 +223,7 @@ public CompletableFuture<RemotingCommand> sendMessageBack(ProxyContext ctx, Rece
if (pullResult.status() == com.automq.rocketmq.store.model.message.PullResult.Status.FOUND) {
return pullResult.messageList().get(0);
}
throw new IllegalArgumentException("Message not found.");
throw new ProxyException(apache.rocketmq.v2.Code.MESSAGE_NOT_FOUND, "Message not found from server.");
}).thenCompose(messageExt -> {
if (messageExt.deliveryAttempts() > group.getMaxDeliveryAttempt()) {
return deadLetterService.send(group.getGroupId(), messageExt);
Expand Down Expand Up @@ -384,11 +384,11 @@ public CompletableFuture<PopResult> popMessage(ProxyContext ctx, AddressableMess
if (topic.getTopicId() != virtualQueue.topicId()) {
LOGGER.error("Topic id in request header {} does not match topic id in message queue {}, maybe the topic is recreated.",
topic.getTopicId(), virtualQueue.topicId());
throw new CompletionException(new MQBrokerException(ResponseCode.TOPIC_NOT_EXIST, "Topic not exist"));
throw new ProxyException(apache.rocketmq.v2.Code.TOPIC_NOT_FOUND, "Topic resource does not exist.");
}

if (group.getSubMode() != SubscriptionMode.SUB_MODE_POP) {
throw new CompletionException(new MQBrokerException(ResponseCode.NO_PERMISSION, String.format("The consumer group [pullGroup] is not allowed to consume message with pop mode.", group.getName())));
throw new ProxyException(apache.rocketmq.v2.Code.FORBIDDEN, String.format("The consumer group [%s] is not allowed to consume message with pop mode.", group.getName()));
}

topicReference.set(topic);
Expand Down Expand Up @@ -536,7 +536,7 @@ public CompletableFuture<Void> updateConsumerOffset(ProxyContext ctx, Addressabl
return store.resetConsumeOffset(consumerGroup.getGroupId(), topic.getTopicId(), virtualQueue.physicalQueueId(), requestHeader.getCommitOffset());
}).thenAccept(resetConsumeOffsetResult -> {
if (resetConsumeOffsetResult.status() != ResetConsumeOffsetResult.Status.SUCCESS) {
throw new CompletionException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "Reset consume offset failed"));
throw new ProxyException(apache.rocketmq.v2.Code.INTERNAL_ERROR, "Reset consume offset failed");
}
});
}
Expand Down Expand Up @@ -579,11 +579,11 @@ public CompletableFuture<PullResult> pullMessage(ProxyContext ctx, AddressableMe
if (topic.getTopicId() != virtualQueue.topicId()) {
LOGGER.error("Topic id in request header {} does not match topic id in message queue {}, maybe the topic is recreated.",
topic.getTopicId(), virtualQueue.topicId());
throw new CompletionException(new MQBrokerException(ResponseCode.TOPIC_NOT_EXIST, "Topic not exist"));
throw new ProxyException(apache.rocketmq.v2.Code.TOPIC_NOT_FOUND, "Topic resource does not exist.");
}

if (group.getSubMode() != SubscriptionMode.SUB_MODE_PULL) {
throw new CompletionException(new MQBrokerException(ResponseCode.NO_PERMISSION, String.format("The consumer group [%s] is not allowed to consume message with pull mode.", group.getName())));
throw new ProxyException(apache.rocketmq.v2.Code.FORBIDDEN, String.format("The consumer group [%s] is not allowed to consume message with pull mode.", group.getName()));
}

topicReference.set(topic);
Expand Down Expand Up @@ -685,7 +685,7 @@ private CompletableFuture<Topic> topicOf(String topicName) {
Throwable t = ExceptionUtils.getRealException(throwable);
if (t instanceof ControllerException controllerException) {
if (controllerException.getErrorCode() == Code.NOT_FOUND.ordinal()) {
throw new CompletionException(new MQBrokerException(ResponseCode.TOPIC_NOT_EXIST, "Topic not exist"));
throw new ProxyException(apache.rocketmq.v2.Code.TOPIC_NOT_FOUND, "Topic resource does not exist.");
}
}
// Rethrow other exceptions.
Expand All @@ -700,7 +700,7 @@ private CompletableFuture<ConsumerGroup> consumerGroupOf(String groupName) {
Throwable t = ExceptionUtils.getRealException(throwable);
if (t instanceof ControllerException controllerException) {
if (controllerException.getErrorCode() == Code.NOT_FOUND.ordinal()) {
throw new CompletionException(new MQBrokerException(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST, "Consumer group not found"));
throw new ProxyException(apache.rocketmq.v2.Code.CONSUMER_GROUP_NOT_FOUND, "Consumer group resource does not exist.");
}
}
// Rethrow other exceptions.
Expand Down
Loading

0 comments on commit 90cc1cf

Please sign in to comment.