diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java
index b5e5dc950..8ca6d4e60 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java
@@ -3,6 +3,7 @@
/*-
* Copyright (C) 2020-2021 Confluent, Inc.
*/
+
import io.confluent.parallelconsumer.state.WorkContainer;
import lombok.Builder;
import lombok.Getter;
@@ -12,6 +13,7 @@
import java.time.Duration;
import java.util.Objects;
+import java.util.function.Function;
import static io.confluent.csid.utils.StringUtils.msg;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER;
@@ -143,6 +145,16 @@ public enum CommitMode {
@Builder.Default
private final Duration defaultMessageRetryDelay = Duration.ofSeconds(1);
+ /**
+ * When present, use this to generate the retry delay, instad of {@link #getDefaultMessageRetryDelay()}.
+ *
+ * Overrides {@link #defaultMessageRetryDelay}, even if it's set.
+ */
+ @Builder.Default
+ private final Function retryDelayProvider;
+
+ public static Function retryDelayProviderStatic;
+
public void validate() {
Objects.requireNonNull(consumer, "A consumer must be supplied");
@@ -153,6 +165,7 @@ public void validate() {
//
WorkContainer.setDefaultRetryDelay(getDefaultMessageRetryDelay());
+ ParallelConsumerOptions.retryDelayProviderStatic = getRetryDelayProvider();
}
public boolean isUsingTransactionalProducer() {
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java
index dbb63ff86..d424f4406 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java
@@ -5,6 +5,7 @@
*/
import io.confluent.csid.utils.WallClock;
+import io.confluent.parallelconsumer.ParallelConsumerOptions;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
@@ -61,6 +62,9 @@ public class WorkContainer implements Comparable {
*/
private Duration retryDelay;
+ /**
+ * @see ParallelConsumerOptions#getDefaultMessageRetryDelay()
+ */
@Setter
static Duration defaultRetryDelay = Duration.ofSeconds(1);
@@ -123,10 +127,15 @@ private Temporal tryAgainAt(WallClock clock) {
}
public Duration getRetryDelay() {
- if (retryDelay == null)
- return defaultRetryDelay;
- else
- return retryDelay;
+ var retryDelayProvider = ParallelConsumerOptions.retryDelayProviderStatic;
+ if (retryDelayProvider != null) {
+ return retryDelayProvider.apply(this);
+ } else {
+ if (retryDelay == null)
+ return defaultRetryDelay;
+ else
+ return retryDelay;
+ }
}
@Override
@@ -193,4 +202,4 @@ public long offset() {
public boolean hasPreviouslyFailed() {
return getNumberOfFailedAttempts() > 0;
}
-}
\ No newline at end of file
+}