Skip to content

Commit

Permalink
[INLONG-11522][Agent] Strictly process new instances in the order of …
Browse files Browse the repository at this point in the history
…submission (#11523)
  • Loading branch information
justinwwhuang authored Nov 21, 2024
1 parent 3c5f523 commit 52505bc
Showing 1 changed file with 21 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class InstanceManager extends AbstractDaemon {
private final ConcurrentHashMap<String, Instance> instanceMap;
// instance profile queue.
private final BlockingQueue<InstanceAction> actionQueue;
private final BlockingQueue<InstanceAction> addActionQueue;
// task thread pool;
private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
Expand Down Expand Up @@ -125,6 +126,7 @@ public InstanceManager(String taskId, int instanceLimit, Store basicStore, TaskS
instanceMap = new ConcurrentHashMap<>();
this.instanceLimit = instanceLimit;
actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
addActionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
}

public String getTaskId() {
Expand Down Expand Up @@ -167,6 +169,7 @@ private Runnable coreThread() {
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
printInstanceState();
dealWithActionQueue();
dealWithAddActionQueue();
keepPaceWithStore();
String inlongGroupId = taskFromStore.getInlongGroupId();
String inlongStreamId = taskFromStore.getInlongStreamId();
Expand Down Expand Up @@ -263,7 +266,9 @@ private void dealWithActionQueue() {
}
switch (action.getActionType()) {
case ADD:
addInstance(action.getProfile());
if (!addActionQueue.offer(action)) {
LOGGER.error("it should never happen: addQueue is full");
}
break;
case FINISH:
finishInstance(action.getProfile());
Expand All @@ -282,6 +287,20 @@ private void dealWithActionQueue() {
}
}

private void dealWithAddActionQueue() {
while (isRunnable()) {
if (instanceMap.size() > instanceLimit) {
LOGGER.error("instanceMap size {} over limit {}", instanceMap.size(), instanceLimit);
return;
}
InstanceAction action = addActionQueue.poll();
if (action == null) {
break;
}
addInstance(action.getProfile());
}
}

@Override
public void start() {
restoreFromStore();
Expand Down Expand Up @@ -320,12 +339,6 @@ private void restoreFromStore() {
}

private void addInstance(InstanceProfile profile) {
if (instanceMap.size() > instanceLimit) {
LOGGER.error("instanceMap size {} over limit {}", instanceMap.size(), instanceLimit);
actionQueue.offer(new InstanceAction(ActionType.ADD, profile));
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
return;
}
LOGGER.info("add instance taskId {} instanceId {}", taskId, profile.getInstanceId());
if (!shouldAddAgain(profile.getInstanceId(), profile.getFileUpdateTime())) {
LOGGER.info("addInstance shouldAddAgain returns false skip taskId {} instanceId {}", taskId,
Expand Down Expand Up @@ -474,7 +487,7 @@ public boolean shouldAddAgain(String fileName, long lastModifyTime) {
}

public boolean isFull() {
return (instanceMap.size() + actionQueue.size()) >= instanceLimit * reserveCoefficient;
return (actionQueue.size() + addActionQueue.size()) >= ACTION_QUEUE_CAPACITY * reserveCoefficient;
}

public long getFinishedInstanceCount() {
Expand Down

0 comments on commit 52505bc

Please sign in to comment.