Skip to content

Commit

Permalink
CC-7635: implement DLQ reporter for malformed doc records (confluenti…
Browse files Browse the repository at this point in the history
…nc#421)

* CC-7635: implement DLQ reporter for malformed doc records

Signed-off-by: Lev Zemlyanov <[email protected]>
  • Loading branch information
Lev Zemlyanov authored Jun 24, 2020
1 parent edf449b commit 9908a18
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 57 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<!-- switch statements on types exceed maximum complexity -->
<suppress
checks="(CyclomaticComplexity)"
files="(JestElasticsearchClient|Mapping).java"
files="(JestElasticsearchClient|Mapping|BulkProcessor).java"
/>

<!-- TODO: Undecided if this is too much -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void start(Map<String, String> props, ElasticsearchClient client) {
config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG);
boolean dropInvalidMessage =
config.getBoolean(ElasticsearchSinkConnectorConfig.DROP_INVALID_MESSAGE_CONFIG);
boolean createIndicesAtStartTime =
final boolean createIndicesAtStartTime =
config.getBoolean(ElasticsearchSinkConnectorConfig.AUTO_CREATE_INDICES_AT_START_CONFIG);

DataConverter.BehaviorOnNullValues behaviorOnNullValues =
Expand Down Expand Up @@ -139,6 +139,18 @@ public void start(Map<String, String> props, ElasticsearchClient client) {
.setBehaviorOnNullValues(behaviorOnNullValues)
.setBehaviorOnMalformedDoc(behaviorOnMalformedDoc);

try {
if (context.errantRecordReporter() == null) {
log.info("Errant record reporter not configured.");
}

// may be null if DLQ not enabled
builder.setErrantRecordReporter(context.errantRecordReporter());
} catch (NoClassDefFoundError e) {
// Will occur in Connect runtimes earlier than 2.6
log.warn("AK versions prior to 2.6 do not support the errant record reporter");
}

this.createIndicesAtStartTime = createIndicesAtStartTime;

writer = builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.confluent.connect.elasticsearch.bulk.BulkProcessor;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -52,7 +53,6 @@ public class ElasticsearchWriter {
private final DataConverter converter;

private final Set<String> existingMappings;
private final BehaviorOnMalformedDoc behaviorOnMalformedDoc;

ElasticsearchWriter(
ElasticsearchClient client,
Expand All @@ -72,7 +72,8 @@ public class ElasticsearchWriter {
long retryBackoffMs,
boolean dropInvalidMessage,
BehaviorOnNullValues behaviorOnNullValues,
BehaviorOnMalformedDoc behaviorOnMalformedDoc
BehaviorOnMalformedDoc behaviorOnMalformedDoc,
ErrantRecordReporter reporter
) {
this.client = client;
this.type = type;
Expand All @@ -85,7 +86,6 @@ public class ElasticsearchWriter {
this.dropInvalidMessage = dropInvalidMessage;
this.behaviorOnNullValues = behaviorOnNullValues;
this.converter = new DataConverter(useCompactMapEntries, behaviorOnNullValues);
this.behaviorOnMalformedDoc = behaviorOnMalformedDoc;

bulkProcessor = new BulkProcessor<>(
new SystemTime(),
Expand All @@ -96,7 +96,8 @@ public class ElasticsearchWriter {
lingerMs,
maxRetries,
retryBackoffMs,
behaviorOnMalformedDoc
behaviorOnMalformedDoc,
reporter
);

existingMappings = new HashSet<>();
Expand All @@ -121,6 +122,7 @@ public static class Builder {
private boolean dropInvalidMessage;
private BehaviorOnNullValues behaviorOnNullValues = BehaviorOnNullValues.DEFAULT;
private BehaviorOnMalformedDoc behaviorOnMalformedDoc;
private ErrantRecordReporter reporter;

public Builder(ElasticsearchClient client) {
this.client = client;
Expand Down Expand Up @@ -210,6 +212,11 @@ public Builder setBehaviorOnMalformedDoc(BehaviorOnMalformedDoc behaviorOnMalfor
return this;
}

public Builder setErrantRecordReporter(ErrantRecordReporter reporter) {
this.reporter = reporter;
return this;
}

public ElasticsearchWriter build() {
return new ElasticsearchWriter(
client,
Expand All @@ -229,7 +236,8 @@ public ElasticsearchWriter build() {
retryBackoffMs,
dropInvalidMessage,
behaviorOnNullValues,
behaviorOnMalformedDoc
behaviorOnMalformedDoc,
reporter
);
}
}
Expand Down Expand Up @@ -315,7 +323,7 @@ record = converter.convertRecord(
sinkRecord.kafkaPartition(),
sinkRecord.kafkaOffset()
);
bulkProcessor.add(record, flushTimeoutMs);
bulkProcessor.add(record, sinkRecord, flushTimeoutMs);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
import io.confluent.connect.elasticsearch.LogContext;
import io.confluent.connect.elasticsearch.RetryUtil;

import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -60,6 +63,7 @@ public class BulkProcessor<R, B> {
private final int maxRetries;
private final long retryBackoffMs;
private final BehaviorOnMalformedDoc behaviorOnMalformedDoc;
private final ErrantRecordReporter reporter;

private final Thread farmer;
private final ExecutorService executor;
Expand All @@ -73,6 +77,7 @@ public class BulkProcessor<R, B> {
// shared state, synchronized on (this), may be part of wait() conditions so need notifyAll() on
// changes
private final Deque<R> unsentRecords;
private final ConcurrentHashMap<R, SinkRecord> recordMap;
private int inFlightRecords = 0;
private final LogContext logContext = new LogContext();

Expand All @@ -85,7 +90,8 @@ public BulkProcessor(
long lingerMs,
int maxRetries,
long retryBackoffMs,
BehaviorOnMalformedDoc behaviorOnMalformedDoc
BehaviorOnMalformedDoc behaviorOnMalformedDoc,
ErrantRecordReporter reporter
) {
this.time = time;
this.bulkClient = bulkClient;
Expand All @@ -95,8 +101,10 @@ public BulkProcessor(
this.maxRetries = maxRetries;
this.retryBackoffMs = retryBackoffMs;
this.behaviorOnMalformedDoc = behaviorOnMalformedDoc;
this.reporter = reporter;

unsentRecords = new ArrayDeque<>(maxBufferedRecords);
recordMap = new ConcurrentHashMap<>(maxBufferedRecords);

final ThreadFactory threadFactory = makeThreadFactory();
farmer = threadFactory.newThread(farmerTask());
Expand Down Expand Up @@ -303,7 +311,7 @@ public void throwIfTerminal() {
* <p>If any task has failed prior to or while blocked in the add, or if the timeout expires
* while blocked, {@link ConnectException} will be thrown.
*/
public synchronized void add(R record, long timeoutMs) {
public synchronized void add(R record, SinkRecord original, long timeoutMs) {
throwIfTerminal();

int numBufferedRecords = bufferedRecords();
Expand Down Expand Up @@ -336,6 +344,7 @@ public synchronized void add(R record, long timeoutMs) {
}

unsentRecords.addLast(record);
recordMap.put(record, original);
notifyAll();
}

Expand Down Expand Up @@ -453,6 +462,20 @@ private BulkResponse execute() throws Exception {
} else if (responseContainsMalformedDocError(bulkRsp)) {
retriable = bulkRsp.isRetriable();
handleMalformedDoc(bulkRsp);
if (reporter != null) {
for (R record : batch) {
SinkRecord original = recordMap.get(record);
if (original != null) {
reporter.report(
original,
new ReportingException(
"Bulk request failed: " + bulkRsp.getErrorInfo()
)
);
}
}
}
recordMap.keySet().removeAll(batch);
return bulkRsp;
} else {
// for all other errors, throw the error up
Expand Down Expand Up @@ -594,4 +617,26 @@ public String toString() {
return name().toLowerCase(Locale.ROOT);
}
}

/**
* Exception that swallows the stack trace used for reporting errors from Elasticsearch
* (mapper_parser_exception, illegal_argument_exception, and action_request_validation_exception)
* resulting from bad records using the AK 2.6 reporter DLQ interface.
*/
public static class ReportingException extends RuntimeException {

public ReportingException(String message) {
super(message);
}

/**
* This method is overriden to swallow the stack trace.
*
* @return Throwable
*/
@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@

package io.confluent.connect.elasticsearch;

import static org.mockito.Mockito.mock;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
Expand Down Expand Up @@ -68,6 +71,7 @@ public void testPutAndFlush() throws Exception {
Map<String, String> props = createProps();

ElasticsearchSinkTask task = new ElasticsearchSinkTask();
task.initialize(mock(SinkTaskContext.class));
task.start(props, client);
task.open(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)));

Expand Down Expand Up @@ -99,6 +103,7 @@ public void testCreateAndWriteToIndexForTopicWithUppercaseCharacters() {
Map<String, String> props = createProps();

ElasticsearchSinkTask task = new ElasticsearchSinkTask();
task.initialize(mock(SinkTaskContext.class));

String key = "key";
Schema schema = createSchema();
Expand Down Expand Up @@ -132,6 +137,7 @@ public void testCreateAndWriteToIndexNotCreatedAtStartTime() {
props.put(ElasticsearchSinkConnectorConfig.AUTO_CREATE_INDICES_AT_START_CONFIG, "false");

ElasticsearchSinkTask task = new ElasticsearchSinkTask();
task.initialize(mock(SinkTaskContext.class));

String key = "key";
Schema schema = createSchema();
Expand Down
Loading

0 comments on commit 9908a18

Please sign in to comment.