From 3ca4a4cc84cf9faa0a3197613ce866a3f50ec030 Mon Sep 17 00:00:00 2001 From: Petr Heinz Date: Mon, 8 Jan 2024 13:14:22 +0100 Subject: [PATCH] L-849 Limit max number of retries for sending logs (#19) * L-849 Limit max number of retries for sending logs * rename errorLog to logger (errorLog.info() makes no sense) * reset isFlushing when flush fails in LogtailSender --- .../com/logtail/logback/LogtailAppender.java | 121 ++++++++++++++---- 1 file changed, 97 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/logtail/logback/LogtailAppender.java b/src/main/java/com/logtail/logback/LogtailAppender.java index 10519d8..086db8a 100644 --- a/src/main/java/com/logtail/logback/LogtailAppender.java +++ b/src/main/java/com/logtail/logback/LogtailAppender.java @@ -42,6 +42,8 @@ public class LogtailAppender extends UnsynchronizedAppenderBase { protected int batchInterval = 3000; protected int connectTimeout = 5000; protected int readTimeout = 10000; + protected int maxRetries = 5; + protected int retrySleepMilliseconds = 300; protected PatternLayoutEncoder encoder; @@ -55,7 +57,9 @@ public class LogtailAppender extends UnsynchronizedAppenderBase { protected ScheduledExecutorService scheduledExecutorService; protected ScheduledFuture scheduledFuture; protected ObjectMapper dataMapper; - protected Logger errorLog; + protected Logger logger; + protected int retrySize = 0; + protected int retries = 0; protected boolean disabled = false; protected ThreadFactory threadFactory = r -> { @@ -66,7 +70,7 @@ public class LogtailAppender extends UnsynchronizedAppenderBase { }; public LogtailAppender() { - errorLog = LoggerFactory.getLogger(LogtailAppender.class); + logger = LoggerFactory.getLogger(LogtailAppender.class); dataMapper = new ObjectMapper() .setSerializationInclusion(JsonInclude.Include.NON_NULL) @@ -85,9 +89,9 @@ protected void append(ILoggingEvent event) { return; if (this.ingestUrl.isEmpty() || this.sourceToken == null || this.sourceToken.isEmpty()) { - // Prevent potential dead-lock, when a blocking logger is configured - avoid using errorLog directly in append + // Prevent potential dead-lock, when a blocking logger is configured - avoid using logger directly in append startThread("logtail-warning-logger", () -> { - errorLog.warn("Missing Source token for Better Stack - disabling LogtailAppender. Find out how to fix this at: https://betterstack.com/docs/logs/java "); + logger.warn("Missing Source token for Better Stack - disabling LogtailAppender. Find out how to fix this at: https://betterstack.com/docs/logs/java "); }); this.disabled = true; return; @@ -99,9 +103,9 @@ protected void append(ILoggingEvent event) { if (warnAboutMaxQueueSize && batch.size() == maxQueueSize) { this.warnAboutMaxQueueSize = false; - // Prevent potential dead-lock, when a blocking logger is configured - avoid using errorLog directly in append + // Prevent potential dead-lock, when a blocking logger is configured - avoid using logger directly in append startThread("logtail-error-logger", () -> { - errorLog.error("Maximum number of messages in queue reached ({}). New messages will be dropped.", maxQueueSize); + logger.error("Maximum number of messages in queue reached ({}). New messages will be dropped.", maxQueueSize); }); } @@ -123,38 +127,84 @@ protected void flush() { if (batch.isEmpty()) return; + // Guaranteed to not be running concurrently if (isFlushing.getAndSet(true)) return; mustReflush = false; + int flushedSize = batch.size(); + if (flushedSize > batchSize) { + flushedSize = batchSize; + mustReflush = true; + } + if (retries > 0 && flushedSize > retrySize) { + flushedSize = retrySize; + mustReflush = true; + } + + if (!flushLogs(flushedSize)) { + mustReflush = true; + } + + isFlushing.set(false); + + if (mustReflush || batch.size() >= batchSize) + flush(); + } + + protected boolean flushLogs(int flushedSize) { + retrySize = flushedSize; + try { - int flushedSize = batch.size(); - if (flushedSize > batchSize) { - flushedSize = batchSize; - mustReflush = true; + if (retries > maxRetries) { + batch.subList(0, flushedSize).clear(); + logger.error("Dropped batch of {} logs.", flushedSize); + warnAboutMaxQueueSize = true; + retries = 0; + + return true; + } + + if (retries > 0) { + logger.info("Retrying to send {} logs to Better Stack ({} / {})", flushedSize, retries, maxRetries); + try { + TimeUnit.MILLISECONDS.sleep(retrySleepMilliseconds); + } catch (InterruptedException e) { + // Continue + } } LogtailResponse response = callHttpURLConnection(flushedSize); - if (response.getStatus() >= 200 && response.getStatus() < 300) { - batch.subList(0, flushedSize).clear(); - this.warnAboutMaxQueueSize = true; - } else { - errorLog.error("Error calling Better Stack : {} ({})", response.getError(), response.getStatus()); - mustReflush = true; + if (response.getStatus() >= 300 || response.getStatus() < 200) { + logger.error("Error calling Better Stack : {} ({})", response.getError(), response.getStatus()); + retries++; + + return false; } + + batch.subList(0, flushedSize).clear(); + warnAboutMaxQueueSize = true; + retries = 0; + + return true; + + } catch (ConcurrentModificationException e) { + logger.error("Error clearing {} logs from batch, will retry immediately.", flushedSize, e); + retries = maxRetries; // No point in retrying to send the data + } catch (JsonProcessingException e) { - errorLog.error("Error processing JSON data : {}", e.getMessage(), e); + logger.error("Error processing JSON data : {}", e.getMessage(), e); + retries = maxRetries; // No point in retrying when batch cannot be processed into JSON } catch (Exception e) { - errorLog.error("Error trying to call Better Stack : {}", e.getMessage(), e); + logger.error("Error trying to call Better Stack : {}", e.getMessage(), e); } - isFlushing.set(false); + retries++; - if (mustReflush || batch.size() >= batchSize) - flush(); + return false; } protected LogtailResponse callHttpURLConnection(int flushedSize) throws IOException { @@ -163,7 +213,7 @@ protected LogtailResponse callHttpURLConnection(int flushedSize) throws IOExcept try { connection.connect(); } catch (Exception e) { - errorLog.error("Error trying to call Better Stack : {}", e.getMessage(), e); + logger.error("Error trying to call Better Stack : {}", e.getMessage(), e); } try (OutputStream os = connection.getOutputStream()) { @@ -280,7 +330,7 @@ protected Object getMetaValue(String type, String value) { return Boolean.valueOf(value); } } catch (NumberFormatException e) { - errorLog.error("Error getting meta value - {}", e.getMessage(), e); + logger.error("Error getting meta value - {}", e.getMessage(), e); } return value; @@ -292,7 +342,10 @@ public void run() { try { flush(); } catch (Exception e) { - errorLog.error("Error trying to flush : {}", e.getMessage(), e); + logger.error("Error trying to flush : {}", e.getMessage(), e); + if (isFlushing.get()) { + isFlushing.set(false); + } } } } @@ -426,6 +479,26 @@ public void setReadTimeout(int readTimeout) { this.readTimeout = readTimeout; } + /** + * Sets the maximum number of retries for sending logs to Better Stack. After that, current batch of logs will be dropped. + * + * @param maxRetries + * max number of retries for sending logs + */ + public void setMaxRetries(int maxRetries) { + this.maxRetries = maxRetries; + } + + /** + * Sets the number of milliseconds to sleep before retrying to send logs to Better Stack. + * + * @param retrySleepMilliseconds + * number of milliseconds to sleep before retry + */ + public void setRetrySleepMilliseconds(int retrySleepMilliseconds) { + this.retrySleepMilliseconds = retrySleepMilliseconds; + } + public void setEncoder(PatternLayoutEncoder encoder) { this.encoder = encoder; }