From 52505bc739120fd8748f67da41b8befa0e99b5f0 Mon Sep 17 00:00:00 2001 From: justinwwhuang Date: Thu, 21 Nov 2024 21:04:06 +0800 Subject: [PATCH] [INLONG-11522][Agent] Strictly process new instances in the order of submission (#11523) --- .../agent/core/instance/InstanceManager.java | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index 2a27e792e8..1b9dba8191 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -65,6 +65,7 @@ public class InstanceManager extends AbstractDaemon { private final ConcurrentHashMap instanceMap; // instance profile queue. private final BlockingQueue actionQueue; + private final BlockingQueue addActionQueue; // task thread pool; private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( 0, Integer.MAX_VALUE, @@ -125,6 +126,7 @@ public InstanceManager(String taskId, int instanceLimit, Store basicStore, TaskS instanceMap = new ConcurrentHashMap<>(); this.instanceLimit = instanceLimit; actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY); + addActionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY); } public String getTaskId() { @@ -167,6 +169,7 @@ private Runnable coreThread() { AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS); printInstanceState(); dealWithActionQueue(); + dealWithAddActionQueue(); keepPaceWithStore(); String inlongGroupId = taskFromStore.getInlongGroupId(); String inlongStreamId = taskFromStore.getInlongStreamId(); @@ -263,7 +266,9 @@ private void dealWithActionQueue() { } switch (action.getActionType()) { case ADD: - addInstance(action.getProfile()); + if (!addActionQueue.offer(action)) { + LOGGER.error("it should never happen: addQueue is full"); + } break; case FINISH: finishInstance(action.getProfile()); @@ -282,6 +287,20 @@ private void dealWithActionQueue() { } } + private void dealWithAddActionQueue() { + while (isRunnable()) { + if (instanceMap.size() > instanceLimit) { + LOGGER.error("instanceMap size {} over limit {}", instanceMap.size(), instanceLimit); + return; + } + InstanceAction action = addActionQueue.poll(); + if (action == null) { + break; + } + addInstance(action.getProfile()); + } + } + @Override public void start() { restoreFromStore(); @@ -320,12 +339,6 @@ private void restoreFromStore() { } private void addInstance(InstanceProfile profile) { - if (instanceMap.size() > instanceLimit) { - LOGGER.error("instanceMap size {} over limit {}", instanceMap.size(), instanceLimit); - actionQueue.offer(new InstanceAction(ActionType.ADD, profile)); - AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS); - return; - } LOGGER.info("add instance taskId {} instanceId {}", taskId, profile.getInstanceId()); if (!shouldAddAgain(profile.getInstanceId(), profile.getFileUpdateTime())) { LOGGER.info("addInstance shouldAddAgain returns false skip taskId {} instanceId {}", taskId, @@ -474,7 +487,7 @@ public boolean shouldAddAgain(String fileName, long lastModifyTime) { } public boolean isFull() { - return (instanceMap.size() + actionQueue.size()) >= instanceLimit * reserveCoefficient; + return (actionQueue.size() + addActionQueue.size()) >= ACTION_QUEUE_CAPACITY * reserveCoefficient; } public long getFinishedInstanceCount() {