Skip to content

Commit

Permalink
[INLONG-8629][Agent] Fix sending invalid data to dataproxy failed blo…
Browse files Browse the repository at this point in the history
…cks normal data sending (#8630)
  • Loading branch information
justinwwhuang authored Aug 6, 2023
1 parent 2fc03b4 commit be94dce
Showing 1 changed file with 19 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,21 @@ public void Stop() {
shutdown = true;
resendExecutorService.shutdown();
sender.close();
cleanResendQueue();
}

private void cleanResendQueue() {
while (!resendQueue.isEmpty()) {
try {
AgentSenderCallback callback = resendQueue.poll(1, TimeUnit.SECONDS);
if (callback != null) {
MemoryManager.getInstance()
.release(AGENT_GLOBAL_WRITER_PERMIT, (int) callback.batchMessage.getTotalSize());
}
} catch (InterruptedException e) {
LOGGER.error("clean resend queue error{}", e.getMessage());
}
}
}

private AgentMetricItem getMetricItem(Map<String, String> otherDimensions) {
Expand Down Expand Up @@ -222,6 +237,10 @@ private void sendBatchWithRetryCount(BatchProxyMessage batchMessage, int retry)
boolean suc = false;
while (!suc) {
try {
if (!resendQueue.isEmpty()) {
AgentUtils.silenceSleepInMs(retrySleepTime);
continue;
}
sender.asyncSendMessage(new AgentSenderCallback(batchMessage, retry),
batchMessage.getDataList(), batchMessage.getGroupId(), batchMessage.getStreamId(),
batchMessage.getDataTime(), SEQUENTIAL_ID.getNextUuid(), maxSenderTimeout, TimeUnit.SECONDS,
Expand Down

0 comments on commit be94dce

Please sign in to comment.