diff --git a/build.gradle b/build.gradle index 8abf1c7a7..d585faf3f 100644 --- a/build.gradle +++ b/build.gradle @@ -33,7 +33,7 @@ lombok { } group 'com.gotocompany' -version '0.11.5' +version '0.11.6' def projName = "firehose" diff --git a/src/main/java/com/gotocompany/firehose/sinkdecorator/SinkWithRetry.java b/src/main/java/com/gotocompany/firehose/sinkdecorator/SinkWithRetry.java index 51c19d817..cdf220109 100644 --- a/src/main/java/com/gotocompany/firehose/sinkdecorator/SinkWithRetry.java +++ b/src/main/java/com/gotocompany/firehose/sinkdecorator/SinkWithRetry.java @@ -111,7 +111,7 @@ private List doRetry(List messages) throws IOException { firehoseInstrumentation.incrementCounter(RETRY_ATTEMPTS_TOTAL); firehoseInstrumentation.logInfo("Retrying messages attempt count: {}, Number of messages: {}", attemptCount, messages.size()); logDebug(retryMessages); - retryMessages = super.pushMessage(retryMessages); + retryMessages = errorHandler.split(super.pushMessage(retryMessages), ErrorScope.RETRY).get(Boolean.TRUE); backOff(retryMessages, attemptCount); attemptCount++; } diff --git a/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithRetryTest.java b/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithRetryTest.java index 44d23f96c..badd9f20c 100644 --- a/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithRetryTest.java +++ b/src/test/java/com/gotocompany/firehose/sinkdecorator/SinkWithRetryTest.java @@ -19,13 +19,10 @@ import org.mockito.Mockito; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; +import java.util.*; +import static com.gotocompany.firehose.metrics.Metrics.RETRY_ATTEMPTS_TOTAL; +import static com.gotocompany.firehose.metrics.Metrics.RETRY_MESSAGES_TOTAL; import static org.junit.Assert.*; import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.initMocks; @@ -189,9 +186,9 @@ public void shouldAddInstrumentationForRetry() throws Exception { List messageList = sinkWithRetry.pushMessage(Collections.singletonList(message)); assertTrue(messageList.isEmpty()); verify(firehoseInstrumentation, times(1)).logInfo("Maximum retry attempts: {}", 3); - verify(firehoseInstrumentation, times(3)).captureMessageMetrics(Metrics.RETRY_MESSAGES_TOTAL, Metrics.MessageType.TOTAL, ErrorType.DESERIALIZATION_ERROR, 1); - verify(firehoseInstrumentation, times(2)).incrementCounter(Metrics.RETRY_ATTEMPTS_TOTAL); - verify(firehoseInstrumentation, times(1)).captureMessageMetrics(Metrics.RETRY_MESSAGES_TOTAL, Metrics.MessageType.SUCCESS, 3); + verify(firehoseInstrumentation, times(3)).captureMessageMetrics(RETRY_MESSAGES_TOTAL, Metrics.MessageType.TOTAL, ErrorType.DESERIALIZATION_ERROR, 1); + verify(firehoseInstrumentation, times(2)).incrementCounter(RETRY_ATTEMPTS_TOTAL); + verify(firehoseInstrumentation, times(1)).captureMessageMetrics(RETRY_MESSAGES_TOTAL, Metrics.MessageType.SUCCESS, 3); } @Test @@ -207,10 +204,10 @@ public void shouldAddInstrumentationForRetryFailures() throws Exception { List messageList = sinkWithRetry.pushMessage(Collections.singletonList(message)); assertFalse(messageList.isEmpty()); verify(firehoseInstrumentation, times(1)).logInfo("Maximum retry attempts: {}", 1); - verify(firehoseInstrumentation, times(3)).captureMessageMetrics(Metrics.RETRY_MESSAGES_TOTAL, Metrics.MessageType.TOTAL, ErrorType.DESERIALIZATION_ERROR, 1); - verify(firehoseInstrumentation, times(1)).incrementCounter(Metrics.RETRY_ATTEMPTS_TOTAL); - verify(firehoseInstrumentation, times(1)).captureMessageMetrics(Metrics.RETRY_MESSAGES_TOTAL, Metrics.MessageType.SUCCESS, 0); - verify(firehoseInstrumentation, times(3)).captureMessageMetrics(Metrics.RETRY_MESSAGES_TOTAL, Metrics.MessageType.FAILURE, ErrorType.DESERIALIZATION_ERROR, 1); + verify(firehoseInstrumentation, times(3)).captureMessageMetrics(RETRY_MESSAGES_TOTAL, Metrics.MessageType.TOTAL, ErrorType.DESERIALIZATION_ERROR, 1); + verify(firehoseInstrumentation, times(1)).incrementCounter(RETRY_ATTEMPTS_TOTAL); + verify(firehoseInstrumentation, times(1)).captureMessageMetrics(RETRY_MESSAGES_TOTAL, Metrics.MessageType.SUCCESS, 0); + verify(firehoseInstrumentation, times(3)).captureMessageMetrics(RETRY_MESSAGES_TOTAL, Metrics.MessageType.FAILURE, ErrorType.DESERIALIZATION_ERROR, 1); } @Test(expected = IOException.class) @@ -253,4 +250,73 @@ public void shouldRetryMessagesWhenErrorTypesConfigured() throws IOException { assertEquals(1, args.get(1).size()); assertEquals(messageWithError, args.get(1).get(0)); } + + @Test + public void shouldFilterOutNonRetryableErrorsFromRetryAttempts() throws IOException, DeserializerException { + Message retryableMessage = new Message("key1".getBytes(), "value1".getBytes(), "topic", 1, 1, + null, 0, 0, new ErrorInfo(null, ErrorType.DESERIALIZATION_ERROR)); + Message nonRetryableMessage = new Message("key2".getBytes(), "value2".getBytes(), "topic", 1, 2, + null, 0, 0, new ErrorInfo(null, ErrorType.SINK_UNKNOWN_ERROR)); + + ArrayList initialFailedMessages = new ArrayList<>(); + initialFailedMessages.add(retryableMessage); + initialFailedMessages.add(nonRetryableMessage); + + when(sinkDecorator.pushMessage(anyList())) + .thenReturn(initialFailedMessages) + .thenReturn(new ArrayList<>()); + + SinkWithRetry sinkWithRetry = new SinkWithRetry(sinkDecorator, backOffProvider, + firehoseInstrumentation, appConfig, parser, errorHandler); + + List messageList = sinkWithRetry.pushMessage(initialFailedMessages); + + assertEquals(1, messageList.size()); + assertEquals(ErrorType.SINK_UNKNOWN_ERROR, messageList.get(0).getErrorInfo().getErrorType()); + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(List.class); + verify(sinkDecorator, times(2)).pushMessage(argumentCaptor.capture()); + List capturedArgs = argumentCaptor.getAllValues(); + + assertEquals(2, capturedArgs.get(0).size()); + assertEquals(1, capturedArgs.get(1).size()); + assertEquals(ErrorType.DESERIALIZATION_ERROR, + ((Message) capturedArgs.get(1).get(0)).getErrorInfo().getErrorType()); + } + + @Test + public void shouldStopRetryingWhenNonRetryableErrorOccurs() throws IOException, DeserializerException { + errorHandler = new ErrorHandler(ConfigFactory.create(ErrorConfig.class, new HashMap() {{ + put("ERROR_TYPES_FOR_RETRY", ErrorType.DESERIALIZATION_ERROR.name() + "," + ErrorType.SINK_RETRYABLE_ERROR.name()); + }})); + + Message message1 = new Message("key1".getBytes(), "value1".getBytes(), "topic", 1, 1); + Message message2 = new Message("key2".getBytes(), "value2".getBytes(), "topic", 1, 2); + ArrayList messages = new ArrayList<>(); + messages.add(message1); + messages.add(message2); + + ArrayList firstAttemptMessages = new ArrayList<>(); + firstAttemptMessages.add(new Message(message1, new ErrorInfo(new IOException(), ErrorType.SINK_RETRYABLE_ERROR))); + firstAttemptMessages.add(new Message(message2, new ErrorInfo(new IOException(), ErrorType.SINK_RETRYABLE_ERROR))); + + ArrayList secondAttemptMessages = new ArrayList<>(); + secondAttemptMessages.add(new Message(message1, new ErrorInfo(new IOException(), ErrorType.SINK_4XX_ERROR))); + secondAttemptMessages.add(new Message(message2, new ErrorInfo(new IOException(), ErrorType.SINK_4XX_ERROR))); + + when(sinkDecorator.pushMessage(anyList())) + .thenReturn(firstAttemptMessages) + .thenReturn(secondAttemptMessages); + + SinkWithRetry sinkWithRetry = new SinkWithRetry(sinkDecorator, backOffProvider, + firehoseInstrumentation, appConfig, parser, errorHandler); + + List finalFailedMessages = sinkWithRetry.pushMessage(messages); + + assertEquals(0, finalFailedMessages.size()); + verify(sinkDecorator, times(2)).pushMessage(anyList()); + verify(firehoseInstrumentation, times(1)).incrementCounter(RETRY_ATTEMPTS_TOTAL); + verify(firehoseInstrumentation, times(1)).logInfo("Retrying messages attempt count: {}, Number of messages: {}", 1, 2); + } + }