recordMap;
private int inFlightRecords = 0;
private final LogContext logContext = new LogContext();
@@ -85,7 +90,8 @@ public BulkProcessor(
long lingerMs,
int maxRetries,
long retryBackoffMs,
- BehaviorOnMalformedDoc behaviorOnMalformedDoc
+ BehaviorOnMalformedDoc behaviorOnMalformedDoc,
+ ErrantRecordReporter reporter
) {
this.time = time;
this.bulkClient = bulkClient;
@@ -95,8 +101,10 @@ public BulkProcessor(
this.maxRetries = maxRetries;
this.retryBackoffMs = retryBackoffMs;
this.behaviorOnMalformedDoc = behaviorOnMalformedDoc;
+ this.reporter = reporter;
unsentRecords = new ArrayDeque<>(maxBufferedRecords);
+ recordMap = new ConcurrentHashMap<>(maxBufferedRecords);
final ThreadFactory threadFactory = makeThreadFactory();
farmer = threadFactory.newThread(farmerTask());
@@ -303,7 +311,7 @@ public void throwIfTerminal() {
* If any task has failed prior to or while blocked in the add, or if the timeout expires
* while blocked, {@link ConnectException} will be thrown.
*/
- public synchronized void add(R record, long timeoutMs) {
+ public synchronized void add(R record, SinkRecord original, long timeoutMs) {
throwIfTerminal();
int numBufferedRecords = bufferedRecords();
@@ -336,6 +344,7 @@ public synchronized void add(R record, long timeoutMs) {
}
unsentRecords.addLast(record);
+ recordMap.put(record, original);
notifyAll();
}
@@ -453,6 +462,20 @@ private BulkResponse execute() throws Exception {
} else if (responseContainsMalformedDocError(bulkRsp)) {
retriable = bulkRsp.isRetriable();
handleMalformedDoc(bulkRsp);
+ if (reporter != null) {
+ for (R record : batch) {
+ SinkRecord original = recordMap.get(record);
+ if (original != null) {
+ reporter.report(
+ original,
+ new ReportingException(
+ "Bulk request failed: " + bulkRsp.getErrorInfo()
+ )
+ );
+ }
+ }
+ }
+ recordMap.keySet().removeAll(batch);
return bulkRsp;
} else {
// for all other errors, throw the error up
@@ -594,4 +617,26 @@ public String toString() {
return name().toLowerCase(Locale.ROOT);
}
}
+
+ /**
+ * Exception that swallows the stack trace used for reporting errors from Elasticsearch
+ * (mapper_parser_exception, illegal_argument_exception, and action_request_validation_exception)
+ * resulting from bad records using the AK 2.6 reporter DLQ interface.
+ */
+ public static class ReportingException extends RuntimeException {
+
+ public ReportingException(String message) {
+ super(message);
+ }
+
+ /**
+ * This method is overriden to swallow the stack trace.
+ *
+ * @return Throwable
+ */
+ @Override
+ public synchronized Throwable fillInStackTrace() {
+ return this;
+ }
+ }
}
diff --git a/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTaskTest.java b/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTaskTest.java
index 471439dd5..a585e9490 100644
--- a/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTaskTest.java
+++ b/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTaskTest.java
@@ -15,12 +15,15 @@
package io.confluent.connect.elasticsearch;
+import static org.mockito.Mockito.mock;
+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTaskContext;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
@@ -68,6 +71,7 @@ public void testPutAndFlush() throws Exception {
Map props = createProps();
ElasticsearchSinkTask task = new ElasticsearchSinkTask();
+ task.initialize(mock(SinkTaskContext.class));
task.start(props, client);
task.open(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)));
@@ -99,6 +103,7 @@ public void testCreateAndWriteToIndexForTopicWithUppercaseCharacters() {
Map props = createProps();
ElasticsearchSinkTask task = new ElasticsearchSinkTask();
+ task.initialize(mock(SinkTaskContext.class));
String key = "key";
Schema schema = createSchema();
@@ -132,6 +137,7 @@ public void testCreateAndWriteToIndexNotCreatedAtStartTime() {
props.put(ElasticsearchSinkConnectorConfig.AUTO_CREATE_INDICES_AT_START_CONFIG, "false");
ElasticsearchSinkTask task = new ElasticsearchSinkTask();
+ task.initialize(mock(SinkTaskContext.class));
String key = "key";
Schema schema = createSchema();
diff --git a/src/test/java/io/confluent/connect/elasticsearch/bulk/BulkProcessorTest.java b/src/test/java/io/confluent/connect/elasticsearch/bulk/BulkProcessorTest.java
index f30c5cb2a..2e61bd5a8 100644
--- a/src/test/java/io/confluent/connect/elasticsearch/bulk/BulkProcessorTest.java
+++ b/src/test/java/io/confluent/connect/elasticsearch/bulk/BulkProcessorTest.java
@@ -15,12 +15,14 @@
package io.confluent.connect.elasticsearch.bulk;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
@@ -37,9 +39,13 @@
import static org.junit.Assert.fail;
import static io.confluent.connect.elasticsearch.bulk.BulkProcessor.BehaviorOnMalformedDoc;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
public class BulkProcessorTest {
@@ -75,7 +81,7 @@ public boolean expectationsMet() {
}
@Override
- public BulkResponse execute(List request) throws IOException {
+ public BulkResponse execute(List request) {
final Expectation expectation;
try {
expectation = expectQ.remove();
@@ -120,22 +126,23 @@ public void batchingAndLingering() throws InterruptedException, ExecutionExcepti
lingerMs,
maxRetries,
retryBackoffMs,
- behaviorOnMalformedDoc
+ behaviorOnMalformedDoc,
+ null
);
final int addTimeoutMs = 10;
- bulkProcessor.add(1, addTimeoutMs);
- bulkProcessor.add(2, addTimeoutMs);
- bulkProcessor.add(3, addTimeoutMs);
- bulkProcessor.add(4, addTimeoutMs);
- bulkProcessor.add(5, addTimeoutMs);
- bulkProcessor.add(6, addTimeoutMs);
- bulkProcessor.add(7, addTimeoutMs);
- bulkProcessor.add(8, addTimeoutMs);
- bulkProcessor.add(9, addTimeoutMs);
- bulkProcessor.add(10, addTimeoutMs);
- bulkProcessor.add(11, addTimeoutMs);
- bulkProcessor.add(12, addTimeoutMs);
+ bulkProcessor.add(1, sinkRecord(), addTimeoutMs);
+ bulkProcessor.add(2, sinkRecord(), addTimeoutMs);
+ bulkProcessor.add(3, sinkRecord(), addTimeoutMs);
+ bulkProcessor.add(4, sinkRecord(), addTimeoutMs);
+ bulkProcessor.add(5, sinkRecord(), addTimeoutMs);
+ bulkProcessor.add(6, sinkRecord(), addTimeoutMs);
+ bulkProcessor.add(7, sinkRecord(), addTimeoutMs);
+ bulkProcessor.add(8, sinkRecord(), addTimeoutMs);
+ bulkProcessor.add(9, sinkRecord(), addTimeoutMs);
+ bulkProcessor.add(10, sinkRecord(), addTimeoutMs);
+ bulkProcessor.add(11, sinkRecord(), addTimeoutMs);
+ bulkProcessor.add(12, sinkRecord(), addTimeoutMs);
client.expect(Arrays.asList(1, 2, 3, 4, 5), BulkResponse.success());
client.expect(Arrays.asList(6, 7, 8, 9, 10), BulkResponse.success());
@@ -164,7 +171,8 @@ public void flushing() {
lingerMs,
maxRetries,
retryBackoffMs,
- behaviorOnMalformedDoc
+ behaviorOnMalformedDoc,
+ null
);
client.expect(Arrays.asList(1, 2, 3), BulkResponse.success());
@@ -172,9 +180,9 @@ public void flushing() {
bulkProcessor.start();
final int addTimeoutMs = 10;
- bulkProcessor.add(1, addTimeoutMs);
- bulkProcessor.add(2, addTimeoutMs);
- bulkProcessor.add(3, addTimeoutMs);
+ bulkProcessor.add(1, sinkRecord(), addTimeoutMs);
+ bulkProcessor.add(2, sinkRecord(), addTimeoutMs);
+ bulkProcessor.add(3, sinkRecord(), addTimeoutMs);
assertFalse(client.expectationsMet());
@@ -201,15 +209,16 @@ public void addBlocksWhenBufferFull() {
lingerMs,
maxRetries,
retryBackoffMs,
- behaviorOnMalformedDoc
+ behaviorOnMalformedDoc,
+ null
);
final int addTimeoutMs = 10;
- bulkProcessor.add(42, addTimeoutMs);
+ bulkProcessor.add(42, sinkRecord(), addTimeoutMs);
assertEquals(1, bulkProcessor.bufferedRecords());
try {
// BulkProcessor not started, so this add should timeout & throw
- bulkProcessor.add(43, addTimeoutMs);
+ bulkProcessor.add(43, sinkRecord(), addTimeoutMs);
fail();
} catch (ConnectException good) {
}
@@ -238,18 +247,19 @@ public void retriableErrors() throws InterruptedException, ExecutionException {
lingerMs,
maxRetries,
retryBackoffMs,
- behaviorOnMalformedDoc
+ behaviorOnMalformedDoc,
+ null
);
final int addTimeoutMs = 10;
- bulkProcessor.add(42, addTimeoutMs);
- bulkProcessor.add(43, addTimeoutMs);
+ bulkProcessor.add(42, sinkRecord(), addTimeoutMs);
+ bulkProcessor.add(43, sinkRecord(), addTimeoutMs);
assertTrue(bulkProcessor.submitBatchWhenReady().get().succeeded);
}
@Test
- public void retriableErrorsHitMaxRetries() throws InterruptedException, ExecutionException {
+ public void retriableErrorsHitMaxRetries() throws InterruptedException {
final int maxBufferedRecords = 100;
final int maxInFlightBatches = 5;
final int batchSize = 2;
@@ -272,12 +282,13 @@ public void retriableErrorsHitMaxRetries() throws InterruptedException, Executio
lingerMs,
maxRetries,
retryBackoffMs,
- behaviorOnMalformedDoc
+ behaviorOnMalformedDoc,
+ null
);
final int addTimeoutMs = 10;
- bulkProcessor.add(42, addTimeoutMs);
- bulkProcessor.add(43, addTimeoutMs);
+ bulkProcessor.add(42, sinkRecord(), addTimeoutMs);
+ bulkProcessor.add(43, sinkRecord(), addTimeoutMs);
try {
bulkProcessor.submitBatchWhenReady().get();
@@ -309,12 +320,13 @@ public void unretriableErrors() throws InterruptedException {
lingerMs,
maxRetries,
retryBackoffMs,
- behaviorOnMalformedDoc
+ behaviorOnMalformedDoc,
+ null
);
final int addTimeoutMs = 10;
- bulkProcessor.add(42, addTimeoutMs);
- bulkProcessor.add(43, addTimeoutMs);
+ bulkProcessor.add(42, sinkRecord(), addTimeoutMs);
+ bulkProcessor.add(43, sinkRecord(), addTimeoutMs);
try {
bulkProcessor.submitBatchWhenReady().get();
@@ -325,7 +337,7 @@ public void unretriableErrors() throws InterruptedException {
}
@Test
- public void failOnMalformedDoc() throws InterruptedException {
+ public void failOnMalformedDoc() {
final int maxBufferedRecords = 100;
final int maxInFlightBatches = 5;
final int batchSize = 2;
@@ -348,13 +360,14 @@ public void failOnMalformedDoc() throws InterruptedException {
lingerMs,
maxRetries,
retryBackoffMs,
- behaviorOnMalformedDoc
+ behaviorOnMalformedDoc,
+ null
);
bulkProcessor.start();
- bulkProcessor.add(42, 1);
- bulkProcessor.add(43, 1);
+ bulkProcessor.add(42, sinkRecord(),1);
+ bulkProcessor.add(43, sinkRecord(), 1);
try {
final int flushTimeoutMs = 1000;
@@ -367,13 +380,14 @@ public void failOnMalformedDoc() throws InterruptedException {
}
@Test
- public void ignoreOrWarnOnMalformedDoc() throws InterruptedException {
+ public void ignoreOrWarnOnMalformedDoc() {
final int maxBufferedRecords = 100;
final int maxInFlightBatches = 5;
final int batchSize = 2;
final int lingerMs = 5;
final int maxRetries = 3;
final int retryBackoffMs = 1;
+ final ErrantRecordReporter reporter = mock(ErrantRecordReporter.class);
// Test both IGNORE and WARN options
// There is no difference in logic between IGNORE and WARN, except for the logging.
@@ -398,13 +412,14 @@ public void ignoreOrWarnOnMalformedDoc() throws InterruptedException {
lingerMs,
maxRetries,
retryBackoffMs,
- behaviorOnMalformedDoc
+ behaviorOnMalformedDoc,
+ reporter
);
bulkProcessor.start();
- bulkProcessor.add(42, 1);
- bulkProcessor.add(43, 1);
+ bulkProcessor.add(42, sinkRecord(), 1);
+ bulkProcessor.add(43, sinkRecord(), 1);
try {
final int flushTimeoutMs = 1000;
@@ -413,6 +428,8 @@ public void ignoreOrWarnOnMalformedDoc() throws InterruptedException {
fail(e.getMessage());
}
}
+
+ verify(reporter, times(4)).report(eq(sinkRecord()), any());
}
@Test
@@ -437,12 +454,13 @@ public void farmerTaskPropogatesException() {
lingerMs,
maxRetries,
retryBackoffMs,
- behaviorOnMalformedDoc
+ behaviorOnMalformedDoc,
+ null
);
final int addTimeoutMs = 10;
- bulkProcessor.add(42, addTimeoutMs);
- bulkProcessor.add(43, addTimeoutMs);
+ bulkProcessor.add(42, sinkRecord(), addTimeoutMs);
+ bulkProcessor.add(43, sinkRecord(), addTimeoutMs);
Runnable farmer = bulkProcessor.farmerTask();
ConnectException e = assertThrows(
@@ -451,7 +469,7 @@ public void farmerTaskPropogatesException() {
}
@Test
- public void terminateRetriesWhenInterruptedInSleep() throws Exception {
+ public void terminateRetriesWhenInterruptedInSleep() {
final int maxBufferedRecords = 100;
final int maxInFlightBatches = 5;
final int batchSize = 2;
@@ -477,15 +495,20 @@ public void terminateRetriesWhenInterruptedInSleep() throws Exception {
lingerMs,
maxRetries,
retryBackoffMs,
- behaviorOnMalformedDoc
+ behaviorOnMalformedDoc,
+ null
);
final int addTimeoutMs = 10;
- bulkProcessor.add(42, addTimeoutMs);
- bulkProcessor.add(43, addTimeoutMs);
+ bulkProcessor.add(42, sinkRecord(), addTimeoutMs);
+ bulkProcessor.add(43, sinkRecord(), addTimeoutMs);
ExecutionException e = assertThrows(ExecutionException.class,
() -> bulkProcessor.submitBatchWhenReady().get());
assertThat(e.getMessage(), containsString("a retriable error"));
}
+
+ private static SinkRecord sinkRecord() {
+ return new SinkRecord("topic", 0, Schema.STRING_SCHEMA, "key", Schema.INT32_SCHEMA, 0, 0L);
+ }
}
diff --git a/src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchSinkTaskIT.java b/src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchSinkTaskIT.java
index fcf73e492..7a7637af7 100644
--- a/src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchSinkTaskIT.java
+++ b/src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchSinkTaskIT.java
@@ -15,6 +15,8 @@
package io.confluent.connect.elasticsearch.integration;
+import static org.mockito.Mockito.mock;
+
import io.confluent.common.utils.IntegrationTest;
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
import io.confluent.connect.elasticsearch.ElasticsearchSinkTask;
@@ -27,6 +29,7 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTaskContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -42,6 +45,7 @@ public class ElasticsearchSinkTaskIT extends ElasticsearchIntegrationTestBase {
public void beforeEach() {
MDC.put("connector.context", "[MyConnector|task1] ");
Map props = createProps();
+ task.initialize(mock(SinkTaskContext.class));
task.start(props, client);
}