Skip to content

Commit

Permalink
L-849 Limit max number of retries for sending logs (#19)
Browse files Browse the repository at this point in the history
* L-849 Limit max number of retries for sending logs

* rename errorLog to logger (errorLog.info() makes no sense)

* reset isFlushing when flush fails in LogtailSender
  • Loading branch information
PetrHeinz authored Jan 8, 2024
1 parent 605e479 commit 3ca4a4c
Showing 1 changed file with 97 additions and 24 deletions.
121 changes: 97 additions & 24 deletions src/main/java/com/logtail/logback/LogtailAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class LogtailAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
protected int batchInterval = 3000;
protected int connectTimeout = 5000;
protected int readTimeout = 10000;
protected int maxRetries = 5;
protected int retrySleepMilliseconds = 300;

protected PatternLayoutEncoder encoder;

Expand All @@ -55,7 +57,9 @@ public class LogtailAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
protected ScheduledExecutorService scheduledExecutorService;
protected ScheduledFuture<?> scheduledFuture;
protected ObjectMapper dataMapper;
protected Logger errorLog;
protected Logger logger;
protected int retrySize = 0;
protected int retries = 0;
protected boolean disabled = false;

protected ThreadFactory threadFactory = r -> {
Expand All @@ -66,7 +70,7 @@ public class LogtailAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
};

public LogtailAppender() {
errorLog = LoggerFactory.getLogger(LogtailAppender.class);
logger = LoggerFactory.getLogger(LogtailAppender.class);

dataMapper = new ObjectMapper()
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
Expand All @@ -85,9 +89,9 @@ protected void append(ILoggingEvent event) {
return;

if (this.ingestUrl.isEmpty() || this.sourceToken == null || this.sourceToken.isEmpty()) {
// Prevent potential dead-lock, when a blocking logger is configured - avoid using errorLog directly in append
// Prevent potential dead-lock, when a blocking logger is configured - avoid using logger directly in append
startThread("logtail-warning-logger", () -> {
errorLog.warn("Missing Source token for Better Stack - disabling LogtailAppender. Find out how to fix this at: https://betterstack.com/docs/logs/java ");
logger.warn("Missing Source token for Better Stack - disabling LogtailAppender. Find out how to fix this at: https://betterstack.com/docs/logs/java ");
});
this.disabled = true;
return;
Expand All @@ -99,9 +103,9 @@ protected void append(ILoggingEvent event) {

if (warnAboutMaxQueueSize && batch.size() == maxQueueSize) {
this.warnAboutMaxQueueSize = false;
// Prevent potential dead-lock, when a blocking logger is configured - avoid using errorLog directly in append
// Prevent potential dead-lock, when a blocking logger is configured - avoid using logger directly in append
startThread("logtail-error-logger", () -> {
errorLog.error("Maximum number of messages in queue reached ({}). New messages will be dropped.", maxQueueSize);
logger.error("Maximum number of messages in queue reached ({}). New messages will be dropped.", maxQueueSize);
});
}

Expand All @@ -123,38 +127,84 @@ protected void flush() {
if (batch.isEmpty())
return;

// Guaranteed to not be running concurrently
if (isFlushing.getAndSet(true))
return;

mustReflush = false;

int flushedSize = batch.size();
if (flushedSize > batchSize) {
flushedSize = batchSize;
mustReflush = true;
}
if (retries > 0 && flushedSize > retrySize) {
flushedSize = retrySize;
mustReflush = true;
}

if (!flushLogs(flushedSize)) {
mustReflush = true;
}

isFlushing.set(false);

if (mustReflush || batch.size() >= batchSize)
flush();
}

protected boolean flushLogs(int flushedSize) {
retrySize = flushedSize;

try {
int flushedSize = batch.size();
if (flushedSize > batchSize) {
flushedSize = batchSize;
mustReflush = true;
if (retries > maxRetries) {
batch.subList(0, flushedSize).clear();
logger.error("Dropped batch of {} logs.", flushedSize);
warnAboutMaxQueueSize = true;
retries = 0;

return true;
}

if (retries > 0) {
logger.info("Retrying to send {} logs to Better Stack ({} / {})", flushedSize, retries, maxRetries);
try {
TimeUnit.MILLISECONDS.sleep(retrySleepMilliseconds);
} catch (InterruptedException e) {
// Continue
}
}

LogtailResponse response = callHttpURLConnection(flushedSize);

if (response.getStatus() >= 200 && response.getStatus() < 300) {
batch.subList(0, flushedSize).clear();
this.warnAboutMaxQueueSize = true;
} else {
errorLog.error("Error calling Better Stack : {} ({})", response.getError(), response.getStatus());
mustReflush = true;
if (response.getStatus() >= 300 || response.getStatus() < 200) {
logger.error("Error calling Better Stack : {} ({})", response.getError(), response.getStatus());
retries++;

return false;
}

batch.subList(0, flushedSize).clear();
warnAboutMaxQueueSize = true;
retries = 0;

return true;

} catch (ConcurrentModificationException e) {
logger.error("Error clearing {} logs from batch, will retry immediately.", flushedSize, e);
retries = maxRetries; // No point in retrying to send the data

} catch (JsonProcessingException e) {
errorLog.error("Error processing JSON data : {}", e.getMessage(), e);
logger.error("Error processing JSON data : {}", e.getMessage(), e);
retries = maxRetries; // No point in retrying when batch cannot be processed into JSON

} catch (Exception e) {
errorLog.error("Error trying to call Better Stack : {}", e.getMessage(), e);
logger.error("Error trying to call Better Stack : {}", e.getMessage(), e);
}

isFlushing.set(false);
retries++;

if (mustReflush || batch.size() >= batchSize)
flush();
return false;
}

protected LogtailResponse callHttpURLConnection(int flushedSize) throws IOException {
Expand All @@ -163,7 +213,7 @@ protected LogtailResponse callHttpURLConnection(int flushedSize) throws IOExcept
try {
connection.connect();
} catch (Exception e) {
errorLog.error("Error trying to call Better Stack : {}", e.getMessage(), e);
logger.error("Error trying to call Better Stack : {}", e.getMessage(), e);
}

try (OutputStream os = connection.getOutputStream()) {
Expand Down Expand Up @@ -280,7 +330,7 @@ protected Object getMetaValue(String type, String value) {
return Boolean.valueOf(value);
}
} catch (NumberFormatException e) {
errorLog.error("Error getting meta value - {}", e.getMessage(), e);
logger.error("Error getting meta value - {}", e.getMessage(), e);
}

return value;
Expand All @@ -292,7 +342,10 @@ public void run() {
try {
flush();
} catch (Exception e) {
errorLog.error("Error trying to flush : {}", e.getMessage(), e);
logger.error("Error trying to flush : {}", e.getMessage(), e);
if (isFlushing.get()) {
isFlushing.set(false);
}
}
}
}
Expand Down Expand Up @@ -426,6 +479,26 @@ public void setReadTimeout(int readTimeout) {
this.readTimeout = readTimeout;
}

/**
* Sets the maximum number of retries for sending logs to Better Stack. After that, current batch of logs will be dropped.
*
* @param maxRetries
* max number of retries for sending logs
*/
public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
}

/**
* Sets the number of milliseconds to sleep before retrying to send logs to Better Stack.
*
* @param retrySleepMilliseconds
* number of milliseconds to sleep before retry
*/
public void setRetrySleepMilliseconds(int retrySleepMilliseconds) {
this.retrySleepMilliseconds = retrySleepMilliseconds;
}

public void setEncoder(PatternLayoutEncoder encoder) {
this.encoder = encoder;
}
Expand Down

0 comments on commit 3ca4a4c

Please sign in to comment.