Skip to content

Commit

Permalink
feature: confluentinc#65 Custom retry delay provider
Browse files Browse the repository at this point in the history
  • Loading branch information
astubbs committed Jul 23, 2021
1 parent 253f61f commit 87c1331
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 170 deletions.
Original file line number Diff line number Diff line change
@@ -1,165 +1,178 @@
package io.confluent.parallelconsumer;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
*/
import io.confluent.parallelconsumer.state.WorkContainer;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;

import java.time.Duration;
import java.util.Objects;

import static io.confluent.csid.utils.StringUtils.msg;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER;

/**
* The options for the {@link ParallelEoSStreamProcessor} system.
*
* @see #builder()
* @see ParallelConsumerOptions.ParallelConsumerOptionsBuilder
*/
@Getter
@Builder(toBuilder = true)
@ToString
public class ParallelConsumerOptions<K, V> {

/**
* Required parameter for all use.
*/
private final Consumer<K, V> consumer;

/**
* Supplying a producer is only needed if using the produce flows.
*
* @see ParallelStreamProcessor
*/
private final Producer<K, V> producer;

/**
* Path to Managed executor service for Java EE
*/
@Builder.Default
private final String managedExecutorService = "java:comp/DefaultManagedExecutorService";

/**
* Path to Managed thread factory for Java EE
*/
@Builder.Default
private final String managedThreadFactory = "java:comp/DefaultManagedThreadFactory";

/**
* The ordering guarantee to use.
*/
public enum ProcessingOrder {

/**
* No ordering is guaranteed, not even partition order. Fastest. Concurrency is at most the max number of
* concurrency or max number of uncommitted messages, limited by the max concurrency or uncommitted settings.
*/
UNORDERED,

/**
* Process messages within a partition in order, but process multiple partitions in parallel. Similar to running
* more consumer for a topic. Concurrency is at most the number of partitions.
*/
PARTITION,

/**
* Process messages in key order. Concurrency is at most the number of unique keys in a topic, limited by the
* max concurrency or uncommitted settings.
*/
KEY
}

/**
* The type of commit to be made, with either a transactions configured Producer where messages produced are
* committed back to the Broker along with the offsets they originated from, or with the faster simpler Consumer
* offset system either synchronously or asynchronously
*/
public enum CommitMode {

/**
* Periodically commits through the Producer using transactions. Slowest of the options, but no duplicates in
* Kafka guaranteed (message replay may cause duplicates in external systems which is unavoidable with Kafka).
* <p>
* This is separate from using an IDEMPOTENT Producer, which can be used, along with {@link
* CommitMode#PERIODIC_CONSUMER_SYNC} or {@link CommitMode#PERIODIC_CONSUMER_ASYNCHRONOUS}.
*/
PERIODIC_TRANSACTIONAL_PRODUCER,

/**
* Periodically synchronous commits with the Consumer. Much faster than {@link
* #PERIODIC_TRANSACTIONAL_PRODUCER}. Slower but potentially less duplicates than {@link
* #PERIODIC_CONSUMER_ASYNCHRONOUS} upon replay.
*/
PERIODIC_CONSUMER_SYNC,

/**
* Periodically commits offsets asynchronously. The fastest option, under normal conditions will have few or no
* duplicates. Under failure recovery may have more duplicates than {@link #PERIODIC_CONSUMER_SYNC}.
*/
PERIODIC_CONSUMER_ASYNCHRONOUS

}

/**
* The {@link ProcessingOrder} type to use
*/
@Builder.Default
private final ProcessingOrder ordering = ProcessingOrder.KEY;

/**
* The {@link CommitMode} to be used
*/
@Builder.Default
private final CommitMode commitMode = CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS;

/**
* Controls the maximum degree of concurrency to occur. Used to limit concurrent calls to external systems to a
* maximum to prevent overloading them or to a degree, using up quotas.
* <p>
* A note on quotas - if your quota is expressed as maximum concurrent calls, this works well. If it's limited in
* total requests / sec, this may still overload the system. See towards the distributed rate limiting feature for
* this to be properly addressed: https://github.com/confluentinc/parallel-consumer/issues/24 Add distributed rate
* limiting support #24.
* <p>
* In the core module, this sets the number of threads to use in the core's thread pool.
* <p>
* It's recommended to set this quite high, much higher than core count, as it's expected that these threads will
* spend most of their time blocked waiting for IO. For automatic setting of this variable, look out for issue
* https://github.com/confluentinc/parallel-consumer/issues/21 Dynamic concurrency control with flow control or tcp
* congestion control theory #21.
*/
@Builder.Default
private final int maxConcurrency = 16;

/**
* When a message fails, how long the system should wait before trying that message again.
*/
@Builder.Default
private final Duration defaultMessageRetryDelay = Duration.ofSeconds(1);

public void validate() {
Objects.requireNonNull(consumer, "A consumer must be supplied");

if (isUsingTransactionalProducer() && producer == null) {
throw new IllegalArgumentException(msg("Wanting to use Transaction Producer mode ({}) without supplying a Producer instance",
commitMode));
}

//
WorkContainer.setDefaultRetryDelay(getDefaultMessageRetryDelay());
}

public boolean isUsingTransactionalProducer() {
return commitMode.equals(PERIODIC_TRANSACTIONAL_PRODUCER);
}

public boolean isProducerSupplied() {
return getProducer() != null;
}
}
package io.confluent.parallelconsumer;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
*/

import io.confluent.parallelconsumer.state.WorkContainer;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;

import java.time.Duration;
import java.util.Objects;
import java.util.function.Function;

import static io.confluent.csid.utils.StringUtils.msg;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER;

/**
* The options for the {@link ParallelEoSStreamProcessor} system.
*
* @see #builder()
* @see ParallelConsumerOptions.ParallelConsumerOptionsBuilder
*/
@Getter
@Builder(toBuilder = true)
@ToString
public class ParallelConsumerOptions<K, V> {

/**
* Required parameter for all use.
*/
private final Consumer<K, V> consumer;

/**
* Supplying a producer is only needed if using the produce flows.
*
* @see ParallelStreamProcessor
*/
private final Producer<K, V> producer;

/**
* Path to Managed executor service for Java EE
*/
@Builder.Default
private final String managedExecutorService = "java:comp/DefaultManagedExecutorService";

/**
* Path to Managed thread factory for Java EE
*/
@Builder.Default
private final String managedThreadFactory = "java:comp/DefaultManagedThreadFactory";

/**
* The ordering guarantee to use.
*/
public enum ProcessingOrder {

/**
* No ordering is guaranteed, not even partition order. Fastest. Concurrency is at most the max number of
* concurrency or max number of uncommitted messages, limited by the max concurrency or uncommitted settings.
*/
UNORDERED,

/**
* Process messages within a partition in order, but process multiple partitions in parallel. Similar to running
* more consumer for a topic. Concurrency is at most the number of partitions.
*/
PARTITION,

/**
* Process messages in key order. Concurrency is at most the number of unique keys in a topic, limited by the
* max concurrency or uncommitted settings.
*/
KEY
}

/**
* The type of commit to be made, with either a transactions configured Producer where messages produced are
* committed back to the Broker along with the offsets they originated from, or with the faster simpler Consumer
* offset system either synchronously or asynchronously
*/
public enum CommitMode {

/**
* Periodically commits through the Producer using transactions. Slowest of the options, but no duplicates in
* Kafka guaranteed (message replay may cause duplicates in external systems which is unavoidable with Kafka).
* <p>
* This is separate from using an IDEMPOTENT Producer, which can be used, along with {@link
* CommitMode#PERIODIC_CONSUMER_SYNC} or {@link CommitMode#PERIODIC_CONSUMER_ASYNCHRONOUS}.
*/
PERIODIC_TRANSACTIONAL_PRODUCER,

/**
* Periodically synchronous commits with the Consumer. Much faster than {@link
* #PERIODIC_TRANSACTIONAL_PRODUCER}. Slower but potentially less duplicates than {@link
* #PERIODIC_CONSUMER_ASYNCHRONOUS} upon replay.
*/
PERIODIC_CONSUMER_SYNC,

/**
* Periodically commits offsets asynchronously. The fastest option, under normal conditions will have few or no
* duplicates. Under failure recovery may have more duplicates than {@link #PERIODIC_CONSUMER_SYNC}.
*/
PERIODIC_CONSUMER_ASYNCHRONOUS

}

/**
* The {@link ProcessingOrder} type to use
*/
@Builder.Default
private final ProcessingOrder ordering = ProcessingOrder.KEY;

/**
* The {@link CommitMode} to be used
*/
@Builder.Default
private final CommitMode commitMode = CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS;

/**
* Controls the maximum degree of concurrency to occur. Used to limit concurrent calls to external systems to a
* maximum to prevent overloading them or to a degree, using up quotas.
* <p>
* A note on quotas - if your quota is expressed as maximum concurrent calls, this works well. If it's limited in
* total requests / sec, this may still overload the system. See towards the distributed rate limiting feature for
* this to be properly addressed: https://github.com/confluentinc/parallel-consumer/issues/24 Add distributed rate
* limiting support #24.
* <p>
* In the core module, this sets the number of threads to use in the core's thread pool.
* <p>
* It's recommended to set this quite high, much higher than core count, as it's expected that these threads will
* spend most of their time blocked waiting for IO. For automatic setting of this variable, look out for issue
* https://github.com/confluentinc/parallel-consumer/issues/21 Dynamic concurrency control with flow control or tcp
* congestion control theory #21.
*/
@Builder.Default
private final int maxConcurrency = 16;

/**
* When a message fails, how long the system should wait before trying that message again.
*/
@Builder.Default
private final Duration defaultMessageRetryDelay = Duration.ofSeconds(1);

/**
* When present, use this to generate the retry delay, instad of {@link #getDefaultMessageRetryDelay()}.
* <p>
* Overrides {@link #defaultMessageRetryDelay}, even if it's set.
*/
@Builder.Default
private final Function<WorkContainer, Duration> retryDelayProvider;

public static Function<WorkContainer, Duration> retryDelayProviderStatic;

public void validate() {
Objects.requireNonNull(consumer, "A consumer must be supplied");

if (isUsingTransactionalProducer() && producer == null) {
throw new IllegalArgumentException(msg("Wanting to use Transaction Producer mode ({}) without supplying a Producer instance",
commitMode));
}

//
WorkContainer.setDefaultRetryDelay(getDefaultMessageRetryDelay());
ParallelConsumerOptions.retryDelayProviderStatic = getRetryDelayProvider();
}

public boolean isUsingTransactionalProducer() {
return commitMode.equals(PERIODIC_TRANSACTIONAL_PRODUCER);
}

public boolean isProducerSupplied() {
return getProducer() != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

import io.confluent.csid.utils.WallClock;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand Down Expand Up @@ -61,6 +62,9 @@ public class WorkContainer<K, V> implements Comparable<WorkContainer> {
*/
private Duration retryDelay;

/**
* @see ParallelConsumerOptions#getDefaultMessageRetryDelay()
*/
@Setter
static Duration defaultRetryDelay = Duration.ofSeconds(1);

Expand Down Expand Up @@ -123,10 +127,15 @@ private Temporal tryAgainAt(WallClock clock) {
}

public Duration getRetryDelay() {
if (retryDelay == null)
return defaultRetryDelay;
else
return retryDelay;
var retryDelayProvider = ParallelConsumerOptions.retryDelayProviderStatic;
if (retryDelayProvider != null) {
return retryDelayProvider.apply(this);
} else {
if (retryDelay == null)
return defaultRetryDelay;
else
return retryDelay;
}
}

@Override
Expand Down Expand Up @@ -193,4 +202,4 @@ public long offset() {
public boolean hasPreviouslyFailed() {
return getNumberOfFailedAttempts() > 0;
}
}
}

0 comments on commit 87c1331

Please sign in to comment.