upMap = new ConcurrentHashMap<>();
+
+ pc.poll(consumerRecord -> {
+ String serverId = extractServerId(consumerRecord);
+ boolean up = upMap.computeIfAbsent(serverId, ignore -> true);
+
+ if (!up) {
+ up = updateStatusOfSever(serverId);
+ }
+
+ if (up) {
+ try {
+ processRecord(consumerRecord);
+ } catch (CircuitBreakingException e) {
+ log.warn("Server {} is circuitBroken, will retry message when server is up. Record: {}", serverId, consumerRecord);
+ upMap.put(serverId, false);
+ }
+ // no exception, so set server status UP
+ upMap.put(serverId, true);
+ } else {
+ log.warn("Server {} currently down, will retry record latter {}", up, consumerRecord);
+ }
+ });
+----
+
+=== Head of Line Blocking
+
+In order to have a failing record not block progress of a partition, one of the ordering modes other than `PARTITION` must be used, so that the system is allowed to process other messages that are perhaps in `KEY` order or in the case of `UNORDERED` processing - any message.
+This is because in `PARTITION` ordering mode, records are always processed in order of partition, and so the Head of Line blocking feature is effectively disabled.
+
+=== Future Work
+
+Improvements to this system are planned, see the following issues:
+
+* https://github.com/confluentinc/parallel-consumer/issues/65[Enhanced retry epic #65]
+* https://github.com/confluentinc/parallel-consumer/issues/48[Support scheduled message processing (scheduled retry)]
+* https://github.com/confluentinc/parallel-consumer/issues/196[Provide option for max retires, and a call back when reached (potential DLQ) #196]
+* https://github.com/confluentinc/parallel-consumer/issues/34[Monitor for progress and optionally shutdown (leave consumer group), skip message or send to DLQ #34]
+
== Result Models
* Void
@@ -659,6 +777,7 @@ When your function is actually run, a result object will be streamed back to you
After your operation completes, you can also choose to publish a result message back to Kafka.
The message publishing metadata can be streamed back to your client code.
+[[commit-mode]]
== Commit Mode
The system gives you three choices for how to do offset commits.
@@ -668,7 +787,7 @@ The `transactional` mode is explained in the next section.
`Asynchronous` mode is faster, as it doesn't block the control loop.
-`Synchronous` will block the processing loop until a successful commit response is received, however, `Asynchronous` will still be capped by the max processing settings in the `Options` class.
+`Synchronous` will block the processing loop until a successful commit response is received, however, `Asynchronous` will still be capped by the max processing settings in the `ParallelConsumerOptions` class.
If you're used to using the auto commit mode in the normal Kafka consumer, you can think of the `Asynchronous` mode being similar to this.
We suggest starting with this mode, and it is the default.
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java
index fcb1caf08..3d09af427 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java
@@ -1,9 +1,8 @@
package io.confluent.parallelconsumer;
/*-
- * Copyright (C) 2020-2021 Confluent, Inc.
+ * Copyright (C) 2020-2022 Confluent, Inc.
*/
-
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.state.WorkContainer;
import lombok.Builder;
@@ -147,13 +146,17 @@ public enum CommitMode {
private final Duration defaultMessageRetryDelay = Duration.ofSeconds(1);
/**
- * When present, use this to generate the retry delay, instad of {@link #getDefaultMessageRetryDelay()}.
+ * When present, use this to generate the retry delay, instead of {@link #getDefaultMessageRetryDelay()}.
*
* Overrides {@link #defaultMessageRetryDelay}, even if it's set.
*/
@Builder.Default
private final Function retryDelayProvider;
+ /**
+ * Dirty global access to the {@link #retryDelayProvider}.
+ */
+ // TODO remove need for writeable global access
public static Function retryDelayProviderStatic;
/**
diff --git a/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java b/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java
index ab1522f01..99b6b5b10 100644
--- a/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java
+++ b/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java
@@ -1,14 +1,14 @@
package io.confluent.parallelconsumer.examples.core;
/*-
- * Copyright (C) 2020 Confluent, Inc.
+ * Copyright (C) 2020-2022 Confluent, Inc.
*/
-
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.concurrent.CircuitBreakingException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -16,7 +16,10 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import java.time.Duration;
+import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY;
import static pl.tlinkowski.unij.api.UniLists.of;
@@ -111,4 +114,82 @@ static class Result {
String payload;
}
+ void customRetryDelay() {
+ // tag::customRetryDelay[]
+ final double multiplier = 0.5;
+ final int baseDelaySecond = 1;
+
+ ParallelConsumerOptions.builder()
+ .retryDelayProvider(workContainer -> {
+ int numberOfFailedAttempts = workContainer.getNumberOfFailedAttempts();
+ long delayMillis = (long) (baseDelaySecond * Math.pow(multiplier, numberOfFailedAttempts) * 1000);
+ return Duration.ofMillis(delayMillis);
+ });
+ // end::customRetryDelay[]
+ }
+
+
+ void maxRetries() {
+ ParallelStreamProcessor pc = ParallelStreamProcessor.createEosStreamProcessor(null);
+ // tag::maxRetries[]
+ final int maxRetries = 10;
+ final Map, Long> retriesCount = new ConcurrentHashMap<>();
+
+ pc.poll(consumerRecord -> {
+ Long retryCount = retriesCount.computeIfAbsent(consumerRecord, ignore -> 0L);
+ if (retryCount < maxRetries) {
+ processRecord(consumerRecord);
+ // no exception, so completed - remove from map
+ retriesCount.remove(consumerRecord);
+ } else {
+ log.warn("Retry count {} exceeded max of {} for record {}", retryCount, maxRetries, consumerRecord);
+ // giving up, remove from map
+ retriesCount.remove(consumerRecord);
+ }
+ });
+ // end::maxRetries[]
+ }
+
+ private void processRecord(final ConsumerRecord record) {
+ // no-op
+ }
+
+ void circuitBreaker() {
+ ParallelStreamProcessor pc = ParallelStreamProcessor.createEosStreamProcessor(null);
+ // tag::circuitBreaker[]
+ final Map upMap = new ConcurrentHashMap<>();
+
+ pc.poll(consumerRecord -> {
+ String serverId = extractServerId(consumerRecord);
+ boolean up = upMap.computeIfAbsent(serverId, ignore -> true);
+
+ if (!up) {
+ up = updateStatusOfSever(serverId);
+ }
+
+ if (up) {
+ try {
+ processRecord(consumerRecord);
+ } catch (CircuitBreakingException e) {
+ log.warn("Server {} is circuitBroken, will retry message when server is up. Record: {}", serverId, consumerRecord);
+ upMap.put(serverId, false);
+ }
+ // no exception, so set server status UP
+ upMap.put(serverId, true);
+ } else {
+ log.warn("Server {} currently down, will retry record latter {}", up, consumerRecord);
+ }
+ });
+ // end::circuitBreaker[]
+ }
+
+ private boolean updateStatusOfSever(final String serverId) {
+ return false;
+ }
+
+ private String extractServerId(final ConsumerRecord consumerRecord) {
+ // no-op
+ return null;
+ }
+
}
diff --git a/src/docs/README.adoc b/src/docs/README.adoc
index dcb2de8f8..66ff4a280 100644
--- a/src/docs/README.adoc
+++ b/src/docs/README.adoc
@@ -565,6 +565,80 @@ image::https://lucid.app/publicSegments/view/f7a05e99-24e6-4ea3-b3d0-978e306aa56
Even during retries, offsets will always be committed only after successful processing, and in order.
+== Retries
+
+If processing of a record fails, the record will be placed back into it's queue and retried with a configurable delay (see the `ParallelConsumerOptions` class).
+Ordering guarantees will always be adhered to, regardless of failure.
+
+A failure is denoted by *any* exception being thrown from the user's processing function.
+The system catches these exceptions, logs them and replaces the record in the queue for processing later.
+All types of Exceptions thrown are considered retriable.
+To not retry a record, do not throw an exception from your processing fuction.
+
+If for some reason you want to proactively fail a record, without relying on some other system throwing an exception which you don't catch - simply throw an exception of your own design, which the system will treat the same way.
+
+To configure the retry delay, see `ParallelConsumerOptions#defaultRetryDelay`.
+
+At the moment there is no terminal error support, so messages will continue to be retried forever as long as an exception continues to be thrown from the user function (see <>).
+But still this will not hold up the queues in `KEY` or `UNORDERED` modes, however in `PARTITION` mode it *will* block progress.
+Offsets will also continue to be committed (see <> and <>).
+
+=== Retry Delay Function
+
+As part of the https://github.com/confluentinc/parallel-consumer/issues/65[enhanced retry epic], the ability to https://github.com/confluentinc/parallel-consumer/issues/82[dynamically determine the retry delay] was added.
+This can be used to customise retry delay for a record, such as exponential back off or have different delays for different types of records, or have the delay determined by the status of a system etc.
+
+You can access the retry count of a record through it's wrapped `WorkContainer` class, which is the input variable to the retry delay function.
+
+.Example retry delay function implementing exponential backoff
+[source,java,indent=0]
+----
+include::{project_root}/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java[tag=customRetryDelay]
+----
+
+[[skipping-records]]
+=== Skipping Records
+
+If for whatever reason you want to skip a record, simply do not throw an exception, or catch any exception being thrown, log and swallow it and return from the user function normally.
+The system will treat this as a record processing success, mark the record as completed and move on as though it was a normal operation.
+
+A user may choose to skip a record for example, if it has been retried too many times or if the record is invalid or doesn't need processing.
+
+Implementing a https://github.com/confluentinc/parallel-consumer/issues/196[max retries feature] as a part of the system is planned.
+
+.Example of skipping a record after a maximum number of retries is reached
+[source,java,indent=0]
+----
+include::{project_root}/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java[tag=maxRetries]
+----
+
+=== Circuit Breaker Pattern
+
+Although the system doesn't have an https://github.com/confluentinc/parallel-consumer/issues/110[explicit circuit breaker pattern feature], one can be created by combining the custom retry delay function and proactive failure.
+For example, the retry delay can be calculated based upon the status of an external system - i.e. if the external system is currently out of action, use a higher retry.
+Then in the processing function, again check the status of the external system first, and if it's still offline, throw an exception proactively without attempting to process the message.
+This will put the message back in the queue.
+
+.Example of circuit break implementation
+[source,java,indent=0]
+----
+include::{project_root}/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java[tag=circuitBreaker]
+----
+
+=== Head of Line Blocking
+
+In order to have a failing record not block progress of a partition, one of the ordering modes other than `PARTITION` must be used, so that the system is allowed to process other messages that are perhaps in `KEY` order or in the case of `UNORDERED` processing - any message.
+This is because in `PARTITION` ordering mode, records are always processed in order of partition, and so the Head of Line blocking feature is effectively disabled.
+
+=== Future Work
+
+Improvements to this system are planned, see the following issues:
+
+* https://github.com/confluentinc/parallel-consumer/issues/65[Enhanced retry epic #65]
+* https://github.com/confluentinc/parallel-consumer/issues/48[Support scheduled message processing (scheduled retry)]
+* https://github.com/confluentinc/parallel-consumer/issues/196[Provide option for max retires, and a call back when reached (potential DLQ) #196]
+* https://github.com/confluentinc/parallel-consumer/issues/34[Monitor for progress and optionally shutdown (leave consumer group), skip message or send to DLQ #34]
+
== Result Models
* Void
@@ -580,6 +654,7 @@ When your function is actually run, a result object will be streamed back to you
After your operation completes, you can also choose to publish a result message back to Kafka.
The message publishing metadata can be streamed back to your client code.
+[[commit-mode]]
== Commit Mode
The system gives you three choices for how to do offset commits.
@@ -589,7 +664,7 @@ The `transactional` mode is explained in the next section.
`Asynchronous` mode is faster, as it doesn't block the control loop.
-`Synchronous` will block the processing loop until a successful commit response is received, however, `Asynchronous` will still be capped by the max processing settings in the `Options` class.
+`Synchronous` will block the processing loop until a successful commit response is received, however, `Asynchronous` will still be capped by the max processing settings in the `ParallelConsumerOptions` class.
If you're used to using the auto commit mode in the normal Kafka consumer, you can think of the `Asynchronous` mode being similar to this.
We suggest starting with this mode, and it is the default.