Skip to content

Commit

Permalink
[INLONG-8670][DataProxy] Define in detail the exceptions actively thr…
Browse files Browse the repository at this point in the history
…own in Source (#8673)
  • Loading branch information
gosonzhang authored Aug 9, 2023
1 parent c36298c commit ab903af
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,33 @@ public class StatConstants {
public static final java.lang.String EVENT_VISIT_OVERMAX = "visit.overmax";
public static final java.lang.String EVENT_VISIT_LINKIN = "visit.linkin";
public static final java.lang.String EVENT_VISIT_LINKOUT = "visit.linkout";
public static final java.lang.String EVENT_VISIT_EXCEPTION = "visit.exception";
// channel
public static final java.lang.String EVENT_REMOTE_UNWRITABLE = "socket.unwritable";
// link
public static final java.lang.String EVENT_LINK_READ_TIMEOUT = "link.read.timeout";
public static final java.lang.String EVENT_LINK_FRAME_OVERMAX = "link.frame.overmax";
public static final java.lang.String EVENT_LINK_FRAME_CORRPUTED = "link.frame.corrupted";
public static final java.lang.String EVENT_LINK_IO_EXCEPTION = "link.io.exception";
public static final java.lang.String EVENT_LINK_UNKNOWN_EXCEPTION = "link.unknown.exception";
public static final java.lang.String EVENT_LINK_UNWRITABLE = "link.unwritable";
// http link
public static final java.lang.String EVENT_HTTP_LINK_READ_TIMEOUT = "http.link.read.timeout";
public static final java.lang.String EVENT_HTTP_LINK_FRAME_OVERMAX = "http.link.frame.overmax";
public static final java.lang.String EVENT_HTTP_LINK_FRAME_CORRPUTED = "http.link.frame.corrupted";
public static final java.lang.String EVENT_HTTP_LINK_IO_EXCEPTION = "http.link.io.exception";
public static final java.lang.String EVENT_HTTP_LINK_UNKNOWN_EXCEPTION = "http.link.unknown.exception";
public static final java.lang.String EVENT_HTTP_LINK_UNWRITABLE = "http.link.unwritable";
// configure
public static final java.lang.String EVENT_CONFIG_TOPIC_MISSING = "config.topic.missing";
public static final java.lang.String EVENT_CONFIG_IDNUM_EMPTY = "config.idnum.empty";
public static final java.lang.String EVENT_CONFIG_GROUPIDNUM_MISSING = "config.groupidnum.missing";
public static final java.lang.String EVENT_CONFIG_GROUP_IDNUM_INCONSTANT = "config.group.idnum.incons";
public static final java.lang.String EVENT_CONFIG_STREAMIDNUM_MISSING = "config.streamidnum.missing";
public static final java.lang.String EVENT_CONFIG_STREAM_IDNUM_INCONSTANT = "config.stream.idnum.incons";
// source
public static final java.lang.String EVENT_PKG_READABLE_EMPTY = "pkg.readable.empty";
public static final java.lang.String EVENT_PKG_READABLE_OVERMAX = "pkg.readable.overmax";
public static final java.lang.String EVENT_PKG_READABLE_UNFILLED = "pkg.readable.unfilled";
public static final java.lang.String EVENT_PKG_MSGTYPE_V0_INVALID = "pkg.msgtype.v0.invalid";
public static final java.lang.String EVENT_PKG_MSGTYPE_V1_INVALID = "pkg.msgtype.v1.invalid";
// message
public static final java.lang.String EVENT_MSG_READABLE_EMPTY = "msg.readable.empty";
public static final java.lang.String EVENT_MSG_READABLE_OVERMAX = "msg.readable.overmax";
public static final java.lang.String EVENT_MSG_READABLE_UNFILLED = "msg.readable.unfilled";
public static final java.lang.String EVENT_MSG_MSGTYPE_V0_INVALID = "msg.msgtype.v0.invalid";
public static final java.lang.String EVENT_MSG_MSGTYPE_V1_INVALID = "msg.msgtype.v1.invalid";
public static final java.lang.String EVENT_MSG_BIN_TOTALLEN_BELOWMIN = "msg.bin.totallen.belowmin";
public static final java.lang.String EVENT_MSG_TXT_TOTALLEN_BELOWMIN = "msg.txt.totallen.belowmin";
public static final java.lang.String EVENT_MSG_DECODE_FAIL = "msg.decode.failure";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.dataproxy.exception;

public class ChannelUnWritableException extends Exception {

public ChannelUnWritableException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.dataproxy.exception;

public class PkgParseException extends Exception {

public PkgParseException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.exception.ChannelUnWritableException;
import org.apache.inlong.dataproxy.exception.PkgParseException;
import org.apache.inlong.dataproxy.source.v0msg.AbsV0MsgCodec;
import org.apache.inlong.dataproxy.source.v0msg.CodecBinMsg;
import org.apache.inlong.dataproxy.source.v0msg.CodecTextMsg;
Expand All @@ -41,11 +43,15 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.timeout.ReadTimeoutException;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand All @@ -71,6 +77,8 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(ServerMessageHandler.class);
// log print count
private static final LogCounter logCounter = new LogCounter(10, 100000, 30 * 1000);
// except log print count
private static final LogCounter exceptLogCounter = new LogCounter(10, 50000, 20 * 1000);

private static final int INLONG_MSG_V1 = 1;

Expand All @@ -89,20 +97,20 @@ public ServerMessageHandler(BaseSource source) {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg == null) {
source.fileMetricIncSumStats(StatConstants.EVENT_PKG_READABLE_EMPTY);
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_READABLE_EMPTY);
return;
}
ByteBuf cb = (ByteBuf) msg;
try {
int readableLength = cb.readableBytes();
if (readableLength == 0 && source.isFilterEmptyMsg()) {
cb.clear();
source.fileMetricIncSumStats(StatConstants.EVENT_PKG_READABLE_EMPTY);
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_READABLE_EMPTY);
return;
}
if (readableLength > source.getMaxMsgLength()) {
source.fileMetricIncSumStats(StatConstants.EVENT_PKG_READABLE_OVERMAX);
throw new Exception("Error msg, readableLength(" + readableLength +
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_READABLE_OVERMAX);
throw new PkgParseException("Error msg, readableLength(" + readableLength +
") > max allowed message length (" + source.getMaxMsgLength() + ")");
}
// save index
Expand All @@ -112,7 +120,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (readableLength < totalDataLen + INLONG_LENGTH_FIELD_LENGTH) {
// reset index when buffer is not satisfied.
cb.resetReaderIndex();
source.fileMetricIncSumStats(StatConstants.EVENT_PKG_READABLE_UNFILLED);
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_READABLE_UNFILLED);
return;
}
// read type
Expand All @@ -126,20 +134,20 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
processV1Msg(ctx, cb, bodyLength);
} else {
// unknown message type
source.fileMetricIncSumStats(StatConstants.EVENT_PKG_MSGTYPE_V1_INVALID);
throw new Exception("Unknown V1 message version, version = " + msgTypeValue);
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_MSGTYPE_V1_INVALID);
throw new PkgParseException("Unknown V1 message version, version = " + msgTypeValue);
}
} else {
// process v0 messages
Channel channel = ctx.channel();
MsgType msgType = MsgType.valueOf(msgTypeValue);
final long msgRcvTime = System.currentTimeMillis();
if (MsgType.MSG_UNKNOWN == msgType) {
source.fileMetricIncSumStats(StatConstants.EVENT_PKG_MSGTYPE_V0_INVALID);
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_MSGTYPE_V0_INVALID);
if (logger.isDebugEnabled()) {
logger.debug("Received unknown message, channel {}", channel);
}
throw new Exception("Unknown V0 message type, type = " + msgTypeValue);
throw new PkgParseException("Unknown V0 message type, type = " + msgTypeValue);
} else if (MsgType.MSG_HEARTBEAT == msgType) {
// send response message
flushV0MsgPackage(source, channel,
Expand All @@ -161,7 +169,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (logger.isDebugEnabled()) {
logger.debug(errMsg + ", channel {}", channel);
}
throw new Exception(errMsg);
throw new PkgParseException(errMsg);
}
msgCodec = new CodecBinMsg(totalDataLen, msgTypeValue, msgRcvTime, strRemoteIP);
} else {
Expand All @@ -172,7 +180,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (logger.isDebugEnabled()) {
logger.debug(errMsg + ", channel {}", channel);
}
throw new Exception(errMsg);
throw new PkgParseException(errMsg);
}
msgCodec = new CodecTextMsg(totalDataLen, msgTypeValue, msgRcvTime, strRemoteIP);
}
Expand Down Expand Up @@ -225,10 +233,22 @@ public void channelInactive(ChannelHandlerContext ctx) {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_EXCEPTION);
if (logCounter.shouldPrint()) {
logger.warn("{} received an exception from channel {}",
source.getCachedSrcName(), ctx.channel(), cause);
if (!(cause instanceof PkgParseException || cause instanceof ChannelUnWritableException)) {
if (cause instanceof ReadTimeoutException) {
source.fileMetricIncSumStats(StatConstants.EVENT_LINK_READ_TIMEOUT);
} else if (cause instanceof TooLongFrameException) {
source.fileMetricIncSumStats(StatConstants.EVENT_LINK_FRAME_OVERMAX);
} else if (cause instanceof CorruptedFrameException) {
source.fileMetricIncSumStats(StatConstants.EVENT_LINK_FRAME_CORRPUTED);
} else if (cause instanceof IOException) {
source.fileMetricIncSumStats(StatConstants.EVENT_LINK_IO_EXCEPTION);
} else {
source.fileMetricIncSumStats(StatConstants.EVENT_LINK_UNKNOWN_EXCEPTION);
}
if (exceptLogCounter.shouldPrint()) {
logger.warn("{} received an exception from channel {}",
source.getCachedSrcName(), ctx.channel(), cause);
}
}
if (ctx.channel() != null) {
source.getAllChannels().remove(ctx.channel());
Expand Down Expand Up @@ -350,7 +370,7 @@ private void responsePackage(ChannelHandlerContext ctx,
} else {
buffer.release();
logger.warn("Send buffer2 is not writable, disconnect {}", remoteChannel);
throw new Exception("Send buffer2 is not writable, disconnect " + remoteChannel);
throw new ChannelUnWritableException("Send buffer2 is not writable, disconnect " + remoteChannel);
}
}

Expand Down Expand Up @@ -437,12 +457,12 @@ private void processAndResponse(ChannelHandlerContext ctx,
private void responseV0Msg(Channel channel, AbsV0MsgCodec msgObj, StringBuilder strBuff) throws Exception {
// check channel status
if (channel == null || !channel.isWritable()) {
source.fileMetricIncSumStats(StatConstants.EVENT_REMOTE_UNWRITABLE);
source.fileMetricIncSumStats(StatConstants.EVENT_LINK_UNWRITABLE);
if (logCounter.shouldPrint()) {
logger.warn("Prepare send msg but channel full, msgType={}, attr={}, channel={}",
msgObj.getMsgType(), msgObj.getAttr(), channel);
}
throw new Exception("Prepare send msg but channel full");
throw new ChannelUnWritableException("Prepare send msg but channel full");
}
// check whether return response message
if (!msgObj.isNeedResp()) {
Expand Down Expand Up @@ -487,7 +507,7 @@ private void procBinHeartbeatMsg(BaseSource source, Channel channel,
if (logger.isDebugEnabled()) {
logger.debug(errMsg + ", channel {}", channel);
}
throw new Exception(errMsg);
throw new PkgParseException(errMsg);
}
// check validation
int msgHeadPos = cb.readerIndex() - 5;
Expand All @@ -502,7 +522,7 @@ private void procBinHeartbeatMsg(BaseSource source, Channel channel,
if (logger.isDebugEnabled()) {
logger.debug(errMsg + ", channel {}", channel);
}
throw new Exception(errMsg);
throw new PkgParseException(errMsg);
}
if (totalDataLen + BIN_HB_TOTALLEN_SIZE < (bodyLen + attrLen + BIN_HB_FORMAT_SIZE)) {
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_HB_LEN_MALFORMED);
Expand All @@ -512,7 +532,7 @@ private void procBinHeartbeatMsg(BaseSource source, Channel channel,
if (logger.isDebugEnabled()) {
logger.debug(errMsg + ", channel {}", channel);
}
throw new Exception(errMsg);
throw new PkgParseException(errMsg);
}
// read message content
byte version = cb.getByte(msgHeadPos + BIN_HB_VERSION_OFFSET);
Expand Down Expand Up @@ -678,11 +698,11 @@ private void flushV0MsgPackage(BaseSource source, Channel channel,
if (channel == null || !channel.isWritable()) {
// release allocated ByteBuf
binBuffer.release();
source.fileMetricIncSumStats(StatConstants.EVENT_REMOTE_UNWRITABLE);
source.fileMetricIncSumStats(StatConstants.EVENT_LINK_UNWRITABLE);
if (logCounter.shouldPrint()) {
logger.warn("Send msg but channel full, attr={}, channel={}", orgAttr, channel);
}
throw new Exception("Send response but channel full");
throw new ChannelUnWritableException("Send response but channel full");
}
channel.writeAndFlush(binBuffer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
Expand All @@ -46,6 +48,7 @@
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.CharsetUtil;
import org.apache.commons.codec.CharEncoding;
import org.apache.commons.codec.Charsets;
Expand All @@ -71,6 +74,8 @@ public class HttpMessageHandler extends SimpleChannelInboundHandler<FullHttpRequ
private static final Logger logger = LoggerFactory.getLogger(HttpMessageHandler.class);
// log print count
private static final LogCounter logCounter = new LogCounter(10, 100000, 30 * 1000);
// exception log print count
private static final LogCounter exceptLogCounter = new LogCounter(10, 50000, 20 * 1000);
private final BaseSource source;

/**
Expand Down Expand Up @@ -214,8 +219,18 @@ public void channelInactive(ChannelHandlerContext ctx) {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_EXCEPTION);
if (logCounter.shouldPrint()) {
if (cause instanceof ReadTimeoutException) {
source.fileMetricIncSumStats(StatConstants.EVENT_HTTP_LINK_READ_TIMEOUT);
} else if (cause instanceof TooLongFrameException) {
source.fileMetricIncSumStats(StatConstants.EVENT_HTTP_LINK_FRAME_OVERMAX);
} else if (cause instanceof CorruptedFrameException) {
source.fileMetricIncSumStats(StatConstants.EVENT_HTTP_LINK_FRAME_CORRPUTED);
} else if (cause instanceof IOException) {
source.fileMetricIncSumStats(StatConstants.EVENT_HTTP_LINK_IO_EXCEPTION);
} else {
source.fileMetricIncSumStats(StatConstants.EVENT_HTTP_LINK_UNKNOWN_EXCEPTION);
}
if (exceptLogCounter.shouldPrint()) {
logger.warn("{} received an exception from channel {}",
source.getCachedSrcName(), ctx.channel(), cause);
}
Expand Down Expand Up @@ -420,7 +435,7 @@ private void sendResponse(ChannelHandlerContext ctx,
return;
}
if (!ctx.channel().isWritable()) {
source.fileMetricIncSumStats(StatConstants.EVENT_REMOTE_UNWRITABLE);
source.fileMetricIncSumStats(StatConstants.EVENT_HTTP_LINK_UNWRITABLE);
if (logCounter.shouldPrint()) {
logger.warn("Send msg but channel full, channel={}", ctx.channel());
}
Expand Down
Loading

0 comments on commit ab903af

Please sign in to comment.