Skip to content

Commit

Permalink
[INLONG-8657][DataProxy] Cache Source, Sink name and Channel object c…
Browse files Browse the repository at this point in the history
…ontent (#8658)
  • Loading branch information
gosonzhang authored Aug 8, 2023
1 parent 11a5b40 commit 783426e
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public class MessageQueueZoneSink extends AbstractSink implements Configurable,
private static final LogCounter logCounter = new LogCounter(10, 100000, 30 * 1000);

private final long MQ_CLUSTER_STATUS_CHECK_DUR_MS = 2000L;

private String cachedSinkName;
private Channel cachedMsgChannel;
private Context parentContext;
private MessageQueueZoneSinkContext context;
private final List<MessageQueueZoneWorker> workers = new ArrayList<>();
Expand Down Expand Up @@ -90,7 +91,8 @@ public class MessageQueueZoneSink extends AbstractSink implements Configurable,
*/
@Override
public void configure(Context context) {
logger.info("{} start to configure, context:{}.", this.getName(), context.toString());
this.cachedSinkName = getName();
logger.info("{} start to configure, context:{}.", this.cachedSinkName, context.toString());
this.parentContext = context;
}

Expand All @@ -100,11 +102,12 @@ public void configure(Context context) {
@Override
public void start() {
if (getChannel() == null) {
logger.error("{}'s channel is null", this.getName());
logger.error("{}'s channel is null", this.cachedSinkName);
}
cachedMsgChannel = getChannel();
try {
ConfigManager.getInstance().regMetaConfigChgCallback(this);
this.context = new MessageQueueZoneSinkContext(this, parentContext, getChannel());
this.context = new MessageQueueZoneSinkContext(this, parentContext, cachedMsgChannel);
this.context.start();
this.dispatchManager = new BatchPackManager(this, parentContext);
this.scheduledPool = Executors.newScheduledThreadPool(2);
Expand All @@ -122,7 +125,7 @@ public void run() {
this.zoneProducer.start();
// start configure change listener thread
this.configListener = new Thread(new ConfigChangeProcessor());
this.configListener.setName(getName() + "-configure-listener");
this.configListener.setName(this.cachedSinkName + "-configure-listener");
this.configListener.start();
// create worker
MessageQueueZoneWorker zoneWorker;
Expand All @@ -133,7 +136,7 @@ public void run() {
this.workers.add(zoneWorker);
}
} catch (Exception e) {
logger.error("{} start failure", this.getName(), e);
logger.error("{} start failure", this.cachedSinkName, e);
}
super.start();
}
Expand All @@ -159,7 +162,7 @@ public void stop() {
try {
worker.close();
} catch (Throwable e) {
logger.error("{} stop Zone worker failure", this.getName(), e);
logger.error("{} stop Zone worker failure", this.cachedSinkName, e);
}
}
this.context.close();
Expand All @@ -185,11 +188,10 @@ public Status process() throws EventDeliveryException {
}
}
this.dispatchManager.outputOvertimeData();
Channel channel = getChannel();
Transaction tx = channel.getTransaction();
Transaction tx = cachedMsgChannel.getTransaction();
tx.begin();
try {
Event event = channel.take();
Event event = cachedMsgChannel.take();
// no data
if (event == null) {
tx.commit();
Expand Down Expand Up @@ -248,20 +250,25 @@ public Status process() throws EventDeliveryException {
} catch (Throwable t) {
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_TAKE_FAILURE);
if (logCounter.shouldPrint()) {
logger.error("{} process event failed!", this.getName(), t);
logger.error("{} process event failed!", this.cachedSinkName, t);
}
try {
tx.rollback();
} catch (Throwable e) {
if (logCounter.shouldPrint()) {
logger.error("{} channel take transaction rollback exception", this.getName(), e);
logger.error("{} channel take transaction rollback exception", this.cachedSinkName, e);
}
}
return Status.BACKOFF;
} finally {
tx.close();
}
}

public String getCachedSinkName() {
return cachedSinkName;
}

public boolean isMqClusterStarted() {
return mqClusterStarted;
}
Expand Down Expand Up @@ -325,13 +332,13 @@ private class ConfigChangeProcessor implements Runnable {
@Override
public void run() {
long lastCheckTime;
logger.info("{} config-change processor start!", getName());
logger.info("{} config-change processor start!", cachedSinkName);
while (!isShutdown) {
reentrantLock.lock();
try {
condition.await();
} catch (InterruptedException e1) {
logger.info("{} config-change processor meet interrupt, break!", getName());
logger.info("{} config-change processor meet interrupt, break!", cachedSinkName);
break;
} finally {
reentrantLock.unlock();
Expand All @@ -344,7 +351,7 @@ public void run() {
zoneProducer.reloadMetaConfig();
} while (lastCheckTime != lastNotifyTime.get());
}
logger.info("{} config-change processor exit!", getName());
logger.info("{} config-change processor exit!", cachedSinkName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class MessageQueueZoneSinkContext extends SinkContext {
* Constructor
*/
public MessageQueueZoneSinkContext(MessageQueueZoneSink mqZoneSink, Context context, Channel channel) {
super(mqZoneSink.getName(), context, channel);
super(mqZoneSink.getCachedSinkName(), context, channel);
this.mqZoneSink = mqZoneSink;
// proxyClusterId
this.proxyClusterId = CommonConfigHolder.getInstance().getClusterName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public MessageQueueZoneWorker(MessageQueueZoneSink mqZoneSink, int workerIndex,
long fetchWaitMs, MessageQueueZoneProducer zoneProducer) {
super();
this.mqZoneSink = mqZoneSink;
this.workerName = mqZoneSink.getName() + "-worker-" + workerIndex;
this.workerName = mqZoneSink.getCachedSinkName() + "-worker-" + workerIndex;
this.fetchWaitMs = fetchWaitMs;
this.zoneProducer = zoneProducer;
this.status = LifecycleState.IDLE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
Expand Down Expand Up @@ -78,6 +79,8 @@ public abstract class BaseSource
protected Context context;
// whether source reject service
protected volatile boolean isRejectService = false;
protected String cachedSrcName;
protected ChannelProcessor cachedChProcessor;
// source service host
protected String srcHost;
// source serviced port
Expand Down Expand Up @@ -133,7 +136,8 @@ public BaseSource() {

@Override
public void configure(Context context) {
logger.info("{} start to configure context:{}.", this.getName(), context.toString());
this.cachedSrcName = getName();
logger.info("{} start to configure context:{}.", this.cachedSrcName, context.toString());
this.context = context;
this.srcHost = getHostIp(context);
this.srcPort = getHostPort(context);
Expand Down Expand Up @@ -246,9 +250,10 @@ public synchronized void start() {
FailoverChannelProcessorHolder.setChannelProcessor(newProcessor);
}
super.start();
this.cachedChProcessor = getChannelProcessor();
// initial metric item set
this.metricItemSet = new DataProxyMetricItemSet(
CommonConfigHolder.getInstance().getClusterName(), getName(), String.valueOf(srcPort));
CommonConfigHolder.getInstance().getClusterName(), this.cachedSrcName, String.valueOf(srcPort));
MetricRegister.register(metricItemSet);
// init monitor logic
if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
Expand All @@ -258,25 +263,25 @@ public synchronized void start() {
this.monitorIndex.start();
this.monitorStats = new MonitorStats(
CommonConfigHolder.getInstance().getFileMetricEventOutName()
+ AttrConstants.SEP_HASHTAG + this.getName(),
+ AttrConstants.SEP_HASHTAG + this.cachedSrcName,
CommonConfigHolder.getInstance().getFileMetricStatInvlSec() * 1000L,
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
this.monitorStats.start();
}
startSource();
// register
AdminServiceRegister.register(ProxyServiceMBean.MBEAN_TYPE, this.getName(), this);
AdminServiceRegister.register(ProxyServiceMBean.MBEAN_TYPE, this.cachedSrcName, this);
}

@Override
public synchronized void stop() {
logger.info("[STOP {} SOURCE]{} stopping...", this.getProtocolName(), this.getName());
logger.info("[STOP {} SOURCE]{} stopping...", this.getProtocolName(), this.cachedSrcName);
// close channels
if (!allChannels.isEmpty()) {
try {
allChannels.close().awaitUninterruptibly();
} catch (Exception e) {
logger.warn("Close {} netty channels throw exception", this.getName(), e);
logger.warn("Close {} netty channels throw exception", this.cachedSrcName, e);
} finally {
allChannels.clear();
}
Expand All @@ -286,7 +291,7 @@ public synchronized void stop() {
try {
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.warn("Close {} channel future throw exception", this.getName(), e);
logger.warn("Close {} channel future throw exception", this.cachedSrcName, e);
}
}
// stop super class
Expand All @@ -307,7 +312,7 @@ public synchronized void stop() {
monitorStats.stop();
}
}
logger.info("[STOP {} SOURCE]{} stopped", this.getProtocolName(), this.getName());
logger.info("[STOP {} SOURCE]{} stopped", this.getProtocolName(), this.cachedSrcName);
}

@Override
Expand All @@ -334,7 +339,7 @@ public void update() {
}
}
logger.info("Source {} channel check, disconnects {} Illegal channels, waist {} ms",
getName(), cnt, (System.currentTimeMillis() - startTime));
this.cachedSrcName, cnt, (System.currentTimeMillis() - startTime));
}
}

Expand Down Expand Up @@ -435,7 +440,7 @@ private void fileMetricIncStats(StringBuilder strBuff, boolean isSucc, String gr
return;
}
String tenMinsDt = DateTimeUtils.ms2yyyyMMddHHmmTenMins(dt);
strBuff.append(getName()).append(AttrConstants.SEP_HASHTAG)
strBuff.append(this.cachedSrcName).append(AttrConstants.SEP_HASHTAG)
.append(groupId).append(AttrConstants.SEP_HASHTAG)
.append(streamId).append(AttrConstants.SEP_HASHTAG)
.append(topicName).append(AttrConstants.SEP_HASHTAG)
Expand Down Expand Up @@ -464,7 +469,7 @@ private void fileMetricIncStats(StringBuilder strBuff, boolean isSucc, String gr
public void addMetric(boolean result, long size, Event event) {
Map<String, String> dimensions = new HashMap<>();
dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, CommonConfigHolder.getInstance().getClusterName());
dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, getName());
dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, this.cachedSrcName);
dimensions.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, getStrPort());
DataProxyMetricItem.fillInlongId(event, dimensions);
DataProxyMetricItem.fillAuditFormatTime(event, dimensions);
Expand All @@ -486,7 +491,7 @@ public void addMetric(boolean result, long size, Event event) {
*/
public ChannelInitializer getChannelInitializerFactory() {
ChannelInitializer fac = null;
logger.info(this.getName() + " load msgFactory=" + msgFactoryName);
logger.info(this.cachedSrcName + " load msgFactory=" + msgFactoryName);
try {
Class<? extends ChannelInitializer> clazz =
(Class<? extends ChannelInitializer>) Class.forName(msgFactoryName);
Expand All @@ -495,7 +500,7 @@ public ChannelInitializer getChannelInitializerFactory() {
fac = (ChannelInitializer) ctor.newInstance(this);
} catch (Exception e) {
logger.error("{} start error, fail to construct ChannelPipelineFactory with name {}",
this.getName(), msgFactoryName, e);
this.cachedSrcName, msgFactoryName, e);
stop();
throw new FlumeException(e.getMessage());
}
Expand Down Expand Up @@ -531,6 +536,14 @@ public boolean isRejectService() {
return isRejectService;
}

public String getCachedSrcName() {
return cachedSrcName;
}

public ChannelProcessor getCachedChProcessor() {
return cachedChProcessor;
}

/**
* getHostIp
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
(ChannelInboundHandlerAdapter) ctor.newInstance(source);
ch.pipeline().addLast("messageHandler", messageHandler);
} catch (Exception e) {
LOG.error("{} newInstance {} failure!", source.getName(),
LOG.error("{} newInstance {} failure!", source.getCachedSrcName(),
source.getMessageHandlerName(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,16 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().disconnect();
ctx.channel().close();
logger.warn("{} refuse to connect = {} , connections = {}, maxConnections = {}",
source.getName(), ctx.channel(), source.getAllChannels().size(), source.getMaxConnections());
source.getCachedSrcName(), ctx.channel(), source.getAllChannels().size(),
source.getMaxConnections());
return;
}
// add legal channel
source.getAllChannels().add(ctx.channel());
ctx.fireChannelActive();
source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_LINKIN);
logger.info("{} added new channel {}, current connections = {}, maxConnections = {}",
source.getName(), ctx.channel(), source.getAllChannels().size(), source.getMaxConnections());
source.getCachedSrcName(), ctx.channel(), source.getAllChannels().size(), source.getMaxConnections());
}

@Override
Expand All @@ -227,7 +228,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_EXCEPTION);
if (logCounter.shouldPrint()) {
logger.warn("{} received an exception from channel {}",
source.getName(), ctx.channel(), cause);
source.getCachedSrcName(), ctx.channel(), cause);
}
if (ctx.channel() != null) {
source.getAllChannels().remove(ctx.channel());
Expand Down Expand Up @@ -270,7 +271,7 @@ private void processV0Msg(Channel channel, ByteBuf cb, AbsV0MsgCodec msgCodec) t
// build InLong event.
Event event = msgCodec.encEventPackage(source, channel);
try {
source.getChannelProcessor().processEvent(event);
source.getCachedChProcessor().processEvent(event);
source.fileMetricAddSuccStats(strBuff, msgCodec.getGroupId(), msgCodec.getStreamId(),
msgCodec.getTopicName(), msgCodec.getStrRemoteIP(), msgCodec.getMsgProcType(),
msgCodec.getDataTimeMs(), msgCodec.getMsgPkgTime(), msgCodec.getMsgCount(), 1,
Expand Down Expand Up @@ -370,7 +371,7 @@ private void processAndWaitingSave(ChannelHandlerContext ctx,
ProxyPackEvent packEvent = new ProxyPackEvent(inlongGroupId, inlongStreamId, events, callback);
// put to channel
try {
source.getChannelProcessor().processEvent(packEvent);
source.getCachedChProcessor().processEvent(packEvent);
events.forEach(event -> {
source.addMetric(true, event.getBody().length, event);
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V1_POST_SUCCESS);
Expand Down Expand Up @@ -416,7 +417,7 @@ private void processAndResponse(ChannelHandlerContext ctx,
event.setTopic(topic);
// put to channel
try {
source.getChannelProcessor().processEvent(event);
source.getCachedChProcessor().processEvent(event);
source.addMetric(true, event.getBody().length, event);
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V1_POST_SUCCESS);
} catch (Throwable ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ public void configure(Context context) {

@Override
public synchronized void startSource() {
logger.info("start " + this.getName());
logger.info("start " + this.getCachedSrcName());
// build accept group
this.acceptorGroup = new NioEventLoopGroup(maxAcceptThreads,
new DefaultThreadFactory(this.getName() + "-boss-group"));
new DefaultThreadFactory(this.getCachedSrcName() + "-boss-group"));
// build worker group
this.workerGroup = new NioEventLoopGroup(maxWorkerThreads,
new DefaultThreadFactory(this.getName() + "-worker-group"));
new DefaultThreadFactory(this.getCachedSrcName() + "-worker-group"));
// init boostrap
bootstrap = new ServerBootstrap();
if (conLinger >= 0) {
Expand All @@ -97,12 +97,12 @@ public synchronized void startSource() {
}
} catch (Exception e) {
logger.error("Source {} bind ({}:{}) error, program will exit! e = {}",
this.getName(), srcHost, srcPort, e);
this.getCachedSrcName(), srcHost, srcPort, e);
System.exit(-1);
}
ConfigManager.getInstance().addSourceReportInfo(
srcHost, String.valueOf(srcPort), getProtocolName().toUpperCase());
logger.info("Source {} started at ({}:{})!", this.getName(), srcHost, srcPort);
logger.info("Source {} started at ({}:{})!", this.getCachedSrcName(), srcHost, srcPort);
}

@Override
Expand Down
Loading

0 comments on commit 783426e

Please sign in to comment.