From be94dcef5597522b352506d86c36ad4a9e502793 Mon Sep 17 00:00:00 2001 From: justinwwhuang Date: Sun, 6 Aug 2023 15:16:36 +0800 Subject: [PATCH] [INLONG-8629][Agent] Fix sending invalid data to dataproxy failed blocks normal data sending (#8630) --- .../agent/plugin/sinks/SenderManager.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java index 3447eb00588..554d6f59166 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java @@ -168,6 +168,21 @@ public void Stop() { shutdown = true; resendExecutorService.shutdown(); sender.close(); + cleanResendQueue(); + } + + private void cleanResendQueue() { + while (!resendQueue.isEmpty()) { + try { + AgentSenderCallback callback = resendQueue.poll(1, TimeUnit.SECONDS); + if (callback != null) { + MemoryManager.getInstance() + .release(AGENT_GLOBAL_WRITER_PERMIT, (int) callback.batchMessage.getTotalSize()); + } + } catch (InterruptedException e) { + LOGGER.error("clean resend queue error{}", e.getMessage()); + } + } } private AgentMetricItem getMetricItem(Map otherDimensions) { @@ -222,6 +237,10 @@ private void sendBatchWithRetryCount(BatchProxyMessage batchMessage, int retry) boolean suc = false; while (!suc) { try { + if (!resendQueue.isEmpty()) { + AgentUtils.silenceSleepInMs(retrySleepTime); + continue; + } sender.asyncSendMessage(new AgentSenderCallback(batchMessage, retry), batchMessage.getDataList(), batchMessage.getGroupId(), batchMessage.getStreamId(), batchMessage.getDataTime(), SEQUENTIAL_ID.getNextUuid(), maxSenderTimeout, TimeUnit.SECONDS,