From ab903af14d1db5f0788245bc21e1e6531fd5e8c0 Mon Sep 17 00:00:00 2001 From: Goson Zhang <4675739@qq.com> Date: Wed, 9 Aug 2023 14:11:37 +0800 Subject: [PATCH] [INLONG-8670][DataProxy] Define in detail the exceptions actively thrown in Source (#8673) --- .../dataproxy/consts/StatConstants.java | 28 ++++-- .../exception/ChannelUnWritableException.java | 25 +++++ .../exception/PkgParseException.java | 25 +++++ .../source/ServerMessageHandler.java | 66 ++++++++----- .../source/httpMsg/HttpMessageHandler.java | 21 +++- .../inlong/dataproxy/utils/MessageUtils.java | 95 ------------------- 6 files changed, 130 insertions(+), 130 deletions(-) create mode 100644 inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/exception/ChannelUnWritableException.java create mode 100644 inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/exception/PkgParseException.java diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java index af6eb2b0d61..b67b3537fee 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java @@ -26,9 +26,20 @@ public class StatConstants { public static final java.lang.String EVENT_VISIT_OVERMAX = "visit.overmax"; public static final java.lang.String EVENT_VISIT_LINKIN = "visit.linkin"; public static final java.lang.String EVENT_VISIT_LINKOUT = "visit.linkout"; - public static final java.lang.String EVENT_VISIT_EXCEPTION = "visit.exception"; - // channel - public static final java.lang.String EVENT_REMOTE_UNWRITABLE = "socket.unwritable"; + // link + public static final java.lang.String EVENT_LINK_READ_TIMEOUT = "link.read.timeout"; + public static final java.lang.String EVENT_LINK_FRAME_OVERMAX = "link.frame.overmax"; + public static final java.lang.String EVENT_LINK_FRAME_CORRPUTED = "link.frame.corrupted"; + public static final java.lang.String EVENT_LINK_IO_EXCEPTION = "link.io.exception"; + public static final java.lang.String EVENT_LINK_UNKNOWN_EXCEPTION = "link.unknown.exception"; + public static final java.lang.String EVENT_LINK_UNWRITABLE = "link.unwritable"; + // http link + public static final java.lang.String EVENT_HTTP_LINK_READ_TIMEOUT = "http.link.read.timeout"; + public static final java.lang.String EVENT_HTTP_LINK_FRAME_OVERMAX = "http.link.frame.overmax"; + public static final java.lang.String EVENT_HTTP_LINK_FRAME_CORRPUTED = "http.link.frame.corrupted"; + public static final java.lang.String EVENT_HTTP_LINK_IO_EXCEPTION = "http.link.io.exception"; + public static final java.lang.String EVENT_HTTP_LINK_UNKNOWN_EXCEPTION = "http.link.unknown.exception"; + public static final java.lang.String EVENT_HTTP_LINK_UNWRITABLE = "http.link.unwritable"; // configure public static final java.lang.String EVENT_CONFIG_TOPIC_MISSING = "config.topic.missing"; public static final java.lang.String EVENT_CONFIG_IDNUM_EMPTY = "config.idnum.empty"; @@ -36,13 +47,12 @@ public class StatConstants { public static final java.lang.String EVENT_CONFIG_GROUP_IDNUM_INCONSTANT = "config.group.idnum.incons"; public static final java.lang.String EVENT_CONFIG_STREAMIDNUM_MISSING = "config.streamidnum.missing"; public static final java.lang.String EVENT_CONFIG_STREAM_IDNUM_INCONSTANT = "config.stream.idnum.incons"; - // source - public static final java.lang.String EVENT_PKG_READABLE_EMPTY = "pkg.readable.empty"; - public static final java.lang.String EVENT_PKG_READABLE_OVERMAX = "pkg.readable.overmax"; - public static final java.lang.String EVENT_PKG_READABLE_UNFILLED = "pkg.readable.unfilled"; - public static final java.lang.String EVENT_PKG_MSGTYPE_V0_INVALID = "pkg.msgtype.v0.invalid"; - public static final java.lang.String EVENT_PKG_MSGTYPE_V1_INVALID = "pkg.msgtype.v1.invalid"; // message + public static final java.lang.String EVENT_MSG_READABLE_EMPTY = "msg.readable.empty"; + public static final java.lang.String EVENT_MSG_READABLE_OVERMAX = "msg.readable.overmax"; + public static final java.lang.String EVENT_MSG_READABLE_UNFILLED = "msg.readable.unfilled"; + public static final java.lang.String EVENT_MSG_MSGTYPE_V0_INVALID = "msg.msgtype.v0.invalid"; + public static final java.lang.String EVENT_MSG_MSGTYPE_V1_INVALID = "msg.msgtype.v1.invalid"; public static final java.lang.String EVENT_MSG_BIN_TOTALLEN_BELOWMIN = "msg.bin.totallen.belowmin"; public static final java.lang.String EVENT_MSG_TXT_TOTALLEN_BELOWMIN = "msg.txt.totallen.belowmin"; public static final java.lang.String EVENT_MSG_DECODE_FAIL = "msg.decode.failure"; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/exception/ChannelUnWritableException.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/exception/ChannelUnWritableException.java new file mode 100644 index 00000000000..9de3f4c546c --- /dev/null +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/exception/ChannelUnWritableException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.dataproxy.exception; + +public class ChannelUnWritableException extends Exception { + + public ChannelUnWritableException(String message) { + super(message); + } +} diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/exception/PkgParseException.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/exception/PkgParseException.java new file mode 100644 index 00000000000..148aefaec98 --- /dev/null +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/exception/PkgParseException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.dataproxy.exception; + +public class PkgParseException extends Exception { + + public PkgParseException(String message) { + super(message); + } +} diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java index 984ddcb2c85..485d74468f0 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java @@ -25,6 +25,8 @@ import org.apache.inlong.dataproxy.config.ConfigManager; import org.apache.inlong.dataproxy.consts.ConfigConstants; import org.apache.inlong.dataproxy.consts.StatConstants; +import org.apache.inlong.dataproxy.exception.ChannelUnWritableException; +import org.apache.inlong.dataproxy.exception.PkgParseException; import org.apache.inlong.dataproxy.source.v0msg.AbsV0MsgCodec; import org.apache.inlong.dataproxy.source.v0msg.CodecBinMsg; import org.apache.inlong.dataproxy.source.v0msg.CodecTextMsg; @@ -41,11 +43,15 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.CorruptedFrameException; +import io.netty.handler.codec.TooLongFrameException; +import io.netty.handler.timeout.ReadTimeoutException; import org.apache.commons.lang3.StringUtils; import org.apache.flume.Event; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.TimeUnit; @@ -71,6 +77,8 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(ServerMessageHandler.class); // log print count private static final LogCounter logCounter = new LogCounter(10, 100000, 30 * 1000); + // except log print count + private static final LogCounter exceptLogCounter = new LogCounter(10, 50000, 20 * 1000); private static final int INLONG_MSG_V1 = 1; @@ -89,7 +97,7 @@ public ServerMessageHandler(BaseSource source) { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg == null) { - source.fileMetricIncSumStats(StatConstants.EVENT_PKG_READABLE_EMPTY); + source.fileMetricIncSumStats(StatConstants.EVENT_MSG_READABLE_EMPTY); return; } ByteBuf cb = (ByteBuf) msg; @@ -97,12 +105,12 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception int readableLength = cb.readableBytes(); if (readableLength == 0 && source.isFilterEmptyMsg()) { cb.clear(); - source.fileMetricIncSumStats(StatConstants.EVENT_PKG_READABLE_EMPTY); + source.fileMetricIncSumStats(StatConstants.EVENT_MSG_READABLE_EMPTY); return; } if (readableLength > source.getMaxMsgLength()) { - source.fileMetricIncSumStats(StatConstants.EVENT_PKG_READABLE_OVERMAX); - throw new Exception("Error msg, readableLength(" + readableLength + + source.fileMetricIncSumStats(StatConstants.EVENT_MSG_READABLE_OVERMAX); + throw new PkgParseException("Error msg, readableLength(" + readableLength + ") > max allowed message length (" + source.getMaxMsgLength() + ")"); } // save index @@ -112,7 +120,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception if (readableLength < totalDataLen + INLONG_LENGTH_FIELD_LENGTH) { // reset index when buffer is not satisfied. cb.resetReaderIndex(); - source.fileMetricIncSumStats(StatConstants.EVENT_PKG_READABLE_UNFILLED); + source.fileMetricIncSumStats(StatConstants.EVENT_MSG_READABLE_UNFILLED); return; } // read type @@ -126,8 +134,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception processV1Msg(ctx, cb, bodyLength); } else { // unknown message type - source.fileMetricIncSumStats(StatConstants.EVENT_PKG_MSGTYPE_V1_INVALID); - throw new Exception("Unknown V1 message version, version = " + msgTypeValue); + source.fileMetricIncSumStats(StatConstants.EVENT_MSG_MSGTYPE_V1_INVALID); + throw new PkgParseException("Unknown V1 message version, version = " + msgTypeValue); } } else { // process v0 messages @@ -135,11 +143,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception MsgType msgType = MsgType.valueOf(msgTypeValue); final long msgRcvTime = System.currentTimeMillis(); if (MsgType.MSG_UNKNOWN == msgType) { - source.fileMetricIncSumStats(StatConstants.EVENT_PKG_MSGTYPE_V0_INVALID); + source.fileMetricIncSumStats(StatConstants.EVENT_MSG_MSGTYPE_V0_INVALID); if (logger.isDebugEnabled()) { logger.debug("Received unknown message, channel {}", channel); } - throw new Exception("Unknown V0 message type, type = " + msgTypeValue); + throw new PkgParseException("Unknown V0 message type, type = " + msgTypeValue); } else if (MsgType.MSG_HEARTBEAT == msgType) { // send response message flushV0MsgPackage(source, channel, @@ -161,7 +169,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception if (logger.isDebugEnabled()) { logger.debug(errMsg + ", channel {}", channel); } - throw new Exception(errMsg); + throw new PkgParseException(errMsg); } msgCodec = new CodecBinMsg(totalDataLen, msgTypeValue, msgRcvTime, strRemoteIP); } else { @@ -172,7 +180,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception if (logger.isDebugEnabled()) { logger.debug(errMsg + ", channel {}", channel); } - throw new Exception(errMsg); + throw new PkgParseException(errMsg); } msgCodec = new CodecTextMsg(totalDataLen, msgTypeValue, msgRcvTime, strRemoteIP); } @@ -225,10 +233,22 @@ public void channelInactive(ChannelHandlerContext ctx) { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_EXCEPTION); - if (logCounter.shouldPrint()) { - logger.warn("{} received an exception from channel {}", - source.getCachedSrcName(), ctx.channel(), cause); + if (!(cause instanceof PkgParseException || cause instanceof ChannelUnWritableException)) { + if (cause instanceof ReadTimeoutException) { + source.fileMetricIncSumStats(StatConstants.EVENT_LINK_READ_TIMEOUT); + } else if (cause instanceof TooLongFrameException) { + source.fileMetricIncSumStats(StatConstants.EVENT_LINK_FRAME_OVERMAX); + } else if (cause instanceof CorruptedFrameException) { + source.fileMetricIncSumStats(StatConstants.EVENT_LINK_FRAME_CORRPUTED); + } else if (cause instanceof IOException) { + source.fileMetricIncSumStats(StatConstants.EVENT_LINK_IO_EXCEPTION); + } else { + source.fileMetricIncSumStats(StatConstants.EVENT_LINK_UNKNOWN_EXCEPTION); + } + if (exceptLogCounter.shouldPrint()) { + logger.warn("{} received an exception from channel {}", + source.getCachedSrcName(), ctx.channel(), cause); + } } if (ctx.channel() != null) { source.getAllChannels().remove(ctx.channel()); @@ -350,7 +370,7 @@ private void responsePackage(ChannelHandlerContext ctx, } else { buffer.release(); logger.warn("Send buffer2 is not writable, disconnect {}", remoteChannel); - throw new Exception("Send buffer2 is not writable, disconnect " + remoteChannel); + throw new ChannelUnWritableException("Send buffer2 is not writable, disconnect " + remoteChannel); } } @@ -437,12 +457,12 @@ private void processAndResponse(ChannelHandlerContext ctx, private void responseV0Msg(Channel channel, AbsV0MsgCodec msgObj, StringBuilder strBuff) throws Exception { // check channel status if (channel == null || !channel.isWritable()) { - source.fileMetricIncSumStats(StatConstants.EVENT_REMOTE_UNWRITABLE); + source.fileMetricIncSumStats(StatConstants.EVENT_LINK_UNWRITABLE); if (logCounter.shouldPrint()) { logger.warn("Prepare send msg but channel full, msgType={}, attr={}, channel={}", msgObj.getMsgType(), msgObj.getAttr(), channel); } - throw new Exception("Prepare send msg but channel full"); + throw new ChannelUnWritableException("Prepare send msg but channel full"); } // check whether return response message if (!msgObj.isNeedResp()) { @@ -487,7 +507,7 @@ private void procBinHeartbeatMsg(BaseSource source, Channel channel, if (logger.isDebugEnabled()) { logger.debug(errMsg + ", channel {}", channel); } - throw new Exception(errMsg); + throw new PkgParseException(errMsg); } // check validation int msgHeadPos = cb.readerIndex() - 5; @@ -502,7 +522,7 @@ private void procBinHeartbeatMsg(BaseSource source, Channel channel, if (logger.isDebugEnabled()) { logger.debug(errMsg + ", channel {}", channel); } - throw new Exception(errMsg); + throw new PkgParseException(errMsg); } if (totalDataLen + BIN_HB_TOTALLEN_SIZE < (bodyLen + attrLen + BIN_HB_FORMAT_SIZE)) { source.fileMetricIncSumStats(StatConstants.EVENT_MSG_HB_LEN_MALFORMED); @@ -512,7 +532,7 @@ private void procBinHeartbeatMsg(BaseSource source, Channel channel, if (logger.isDebugEnabled()) { logger.debug(errMsg + ", channel {}", channel); } - throw new Exception(errMsg); + throw new PkgParseException(errMsg); } // read message content byte version = cb.getByte(msgHeadPos + BIN_HB_VERSION_OFFSET); @@ -678,11 +698,11 @@ private void flushV0MsgPackage(BaseSource source, Channel channel, if (channel == null || !channel.isWritable()) { // release allocated ByteBuf binBuffer.release(); - source.fileMetricIncSumStats(StatConstants.EVENT_REMOTE_UNWRITABLE); + source.fileMetricIncSumStats(StatConstants.EVENT_LINK_UNWRITABLE); if (logCounter.shouldPrint()) { logger.warn("Send msg but channel full, attr={}, channel={}", orgAttr, channel); } - throw new Exception("Send response but channel full"); + throw new ChannelUnWritableException("Send response but channel full"); } channel.writeAndFlush(binBuffer); } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java index 00359fe983d..bf7d92a1903 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java @@ -36,6 +36,8 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.CorruptedFrameException; +import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; @@ -46,6 +48,7 @@ import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.QueryStringDecoder; import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.ReadTimeoutException; import io.netty.util.CharsetUtil; import org.apache.commons.codec.CharEncoding; import org.apache.commons.codec.Charsets; @@ -71,6 +74,8 @@ public class HttpMessageHandler extends SimpleChannelInboundHandler getEventProcType(String syncSend, String pro return Pair.of(isOrderOrProxy, msgProcType); } - /** - * Return response to client in source - * @param commonAttrMap attribute map - * @param resultMap result map - * @param remoteChannel client channel - * @param msgType the message type - */ - public static void sourceReturnRspPackage(Map commonAttrMap, - Map resultMap, - Channel remoteChannel, - MsgType msgType) throws Exception { - ByteBuf binBuffer; - String origAttrs = null; - final StringBuilder strBuff = new StringBuilder(512); - // check channel and msg type - if (remoteChannel == null - || MsgType.MSG_UNKNOWN.equals(msgType)) { - if (logCounter.shouldPrint()) { - if (remoteChannel == null) { - logger.warn("remoteChannel == null, discard it!", remoteChannel); - } else { - logger.warn("Unknown msgType message from {}, discard it!", remoteChannel); - } - } - return; - } - // build message bytes - if (MsgType.MSG_HEARTBEAT.equals(msgType)) { - binBuffer = buildHeartBeatMsgRspPackage(); - } else { - // check whether return response message - String isAck = commonAttrMap.get(AttributeConstants.MESSAGE_IS_ACK); - if ("false".equalsIgnoreCase(isAck)) { - return; - } - origAttrs = (String) resultMap.get(ConfigConstants.DECODER_ATTRS); - // check whether channel is writable - if (!remoteChannel.isWritable()) { - strBuff.append("Send buffer is full1 by channel ") - .append(remoteChannel).append(", attr is ").append(origAttrs); - if (logCounter.shouldPrint()) { - logger.warn(strBuff.toString()); - } - throw new Exception(strBuff.toString()); - } - // build return attribute string - strBuff.append(ConfigConstants.DATAPROXY_IP_KEY) - .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(getLocalIp()); - String errCode = commonAttrMap.get(AttributeConstants.MESSAGE_PROCESS_ERRCODE); - if (StringUtils.isNotEmpty(errCode)) { - strBuff.append(AttributeConstants.SEPARATOR) - .append(AttributeConstants.MESSAGE_PROCESS_ERRCODE) - .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(errCode); - String errMsg = commonAttrMap.get(AttributeConstants.MESSAGE_PROCESS_ERRMSG); - if (StringUtils.isNotEmpty(errMsg)) { - strBuff.append(AttributeConstants.SEPARATOR) - .append(AttributeConstants.MESSAGE_PROCESS_ERRMSG) - .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(errMsg); - } - } - if (StringUtils.isNotEmpty(origAttrs)) { - strBuff.append(AttributeConstants.SEPARATOR).append(origAttrs); - } - String destAttrs = strBuff.toString(); - // build response message bytes - if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) { - binBuffer = buildBinMsgRspPackage(destAttrs, - commonAttrMap.get(AttributeConstants.UNIQ_ID)); - } else if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) { - binBuffer = buildHBRspPackage(destAttrs, - (Byte) resultMap.get(ConfigConstants.VERSION_TYPE), 0); - } else { - // MsgType.MSG_ACK_SERVICE.equals(msgType) - // MsgType.MSG_ORIGINAL_RETURN.equals(msgType) - // MsgType.MSG_MULTI_BODY.equals(msgType) - // MsgType.MSG_MULTI_BODY_ATTR.equals(msgType) - binBuffer = buildDefMsgRspPackage(msgType, destAttrs); - } - } - // send response message - if (remoteChannel.isWritable()) { - remoteChannel.writeAndFlush(binBuffer); - } else { - // release allocated ByteBuf - binBuffer.release(); - strBuff.delete(0, strBuff.length()); - strBuff.append("Send buffer is full2 by channel ") - .append(remoteChannel).append(", attr is ").append(origAttrs); - if (logCounter.shouldPrint()) { - logger.warn(strBuff.toString()); - } - throw new Exception(strBuff.toString()); - } - } - /** * Return response to client in sink * @param event the event need to response