Skip to content

Commit

Permalink
docs: Retry documentation (#194)
Browse files Browse the repository at this point in the history
  • Loading branch information
astubbs committed Feb 15, 2022
1 parent 84cf11a commit 4c62a9f
Show file tree
Hide file tree
Showing 6 changed files with 339 additions and 7 deletions.
27 changes: 27 additions & 0 deletions .idea/runConfigurations/asciidoc_template_build.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions .idea/runConfigurations/license_format.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

121 changes: 120 additions & 1 deletion README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,124 @@ image::https://lucid.app/publicSegments/view/f7a05e99-24e6-4ea3-b3d0-978e306aa56

Even during retries, offsets will always be committed only after successful processing, and in order.

== Retries

If processing of a record fails, the record will be placed back into it's queue and retried with a configurable delay (see the `ParallelConsumerOptions` class).
Ordering guarantees will always be adhered to, regardless of failure.

A failure is denoted by *any* exception being thrown from the user's processing function.
The system catches these exceptions, logs them and replaces the record in the queue for processing later.
All types of Exceptions thrown are considered retriable.
To not retry a record, do not throw an exception from your processing fuction.

If for some reason you want to proactively fail a record, without relying on some other system throwing an exception which you don't catch - simply throw an exception of your own design, which the system will treat the same way.

To configure the retry delay, see `ParallelConsumerOptions#defaultRetryDelay`.

At the moment there is no terminal error support, so messages will continue to be retried forever as long as an exception continues to be thrown from the user function (see <<skipping-records>>).
But still this will not hold up the queues in `KEY` or `UNORDERED` modes, however in `PARTITION` mode it *will* block progress.
Offsets will also continue to be committed (see <<commit-mode>> and <<Offset Map>>).

=== Retry Delay Function

As part of the https://github.com/confluentinc/parallel-consumer/issues/65[enhanced retry epic], the ability to https://github.com/confluentinc/parallel-consumer/issues/82[dynamically determine the retry delay] was added.
This can be used to customise retry delay for a record, such as exponential back off or have different delays for different types of records, or have the delay determined by the status of a system etc.

You can access the retry count of a record through it's wrapped `WorkContainer` class, which is the input variable to the retry delay function.

.Example retry delay function implementing exponential backoff
[source,java,indent=0]
----
final double multiplier = 0.5;
final int baseDelaySecond = 1;
ParallelConsumerOptions.<String, String>builder()
.retryDelayProvider(workContainer -> {
int numberOfFailedAttempts = workContainer.getNumberOfFailedAttempts();
long delayMillis = (long) (baseDelaySecond * Math.pow(multiplier, numberOfFailedAttempts) * 1000);
return Duration.ofMillis(delayMillis);
});
----

[[skipping-records]]
=== Skipping Records

If for whatever reason you want to skip a record, simply do not throw an exception, or catch any exception being thrown, log and swallow it and return from the user function normally.
The system will treat this as a record processing success, mark the record as completed and move on as though it was a normal operation.

A user may choose to skip a record for example, if it has been retried too many times or if the record is invalid or doesn't need processing.

Implementing a https://github.com/confluentinc/parallel-consumer/issues/196[max retries feature] as a part of the system is planned.

.Example of skipping a record after a maximum number of retries is reached
[source,java,indent=0]
----
final int maxRetries = 10;
final Map<ConsumerRecord<String, String>, Long> retriesCount = new ConcurrentHashMap<>();
pc.poll(consumerRecord -> {
Long retryCount = retriesCount.computeIfAbsent(consumerRecord, ignore -> 0L);
if (retryCount < maxRetries) {
processRecord(consumerRecord);
// no exception, so completed - remove from map
retriesCount.remove(consumerRecord);
} else {
log.warn("Retry count {} exceeded max of {} for record {}", retryCount, maxRetries, consumerRecord);
// giving up, remove from map
retriesCount.remove(consumerRecord);
}
});
----

=== Circuit Breaker Pattern

Although the system doesn't have an https://github.com/confluentinc/parallel-consumer/issues/110[explicit circuit breaker pattern feature], one can be created by combining the custom retry delay function and proactive failure.
For example, the retry delay can be calculated based upon the status of an external system - i.e. if the external system is currently out of action, use a higher retry.
Then in the processing function, again check the status of the external system first, and if it's still offline, throw an exception proactively without attempting to process the message.
This will put the message back in the queue.

.Example of circuit break implementation
[source,java,indent=0]
----
final Map<String, Boolean> upMap = new ConcurrentHashMap<>();
pc.poll(consumerRecord -> {
String serverId = extractServerId(consumerRecord);
boolean up = upMap.computeIfAbsent(serverId, ignore -> true);
if (!up) {
up = updateStatusOfSever(serverId);
}
if (up) {
try {
processRecord(consumerRecord);
} catch (CircuitBreakingException e) {
log.warn("Server {} is circuitBroken, will retry message when server is up. Record: {}", serverId, consumerRecord);
upMap.put(serverId, false);
}
// no exception, so set server status UP
upMap.put(serverId, true);
} else {
log.warn("Server {} currently down, will retry record latter {}", up, consumerRecord);
}
});
----

=== Head of Line Blocking

In order to have a failing record not block progress of a partition, one of the ordering modes other than `PARTITION` must be used, so that the system is allowed to process other messages that are perhaps in `KEY` order or in the case of `UNORDERED` processing - any message.
This is because in `PARTITION` ordering mode, records are always processed in order of partition, and so the Head of Line blocking feature is effectively disabled.

=== Future Work

Improvements to this system are planned, see the following issues:

* https://github.com/confluentinc/parallel-consumer/issues/65[Enhanced retry epic #65]
* https://github.com/confluentinc/parallel-consumer/issues/48[Support scheduled message processing (scheduled retry)]
* https://github.com/confluentinc/parallel-consumer/issues/196[Provide option for max retires, and a call back when reached (potential DLQ) #196]
* https://github.com/confluentinc/parallel-consumer/issues/34[Monitor for progress and optionally shutdown (leave consumer group), skip message or send to DLQ #34]

== Result Models

* Void
Expand All @@ -659,6 +777,7 @@ When your function is actually run, a result object will be streamed back to you
After your operation completes, you can also choose to publish a result message back to Kafka.
The message publishing metadata can be streamed back to your client code.

[[commit-mode]]
== Commit Mode

The system gives you three choices for how to do offset commits.
Expand All @@ -668,7 +787,7 @@ The `transactional` mode is explained in the next section.

`Asynchronous` mode is faster, as it doesn't block the control loop.

`Synchronous` will block the processing loop until a successful commit response is received, however, `Asynchronous` will still be capped by the max processing settings in the `Options` class.
`Synchronous` will block the processing loop until a successful commit response is received, however, `Asynchronous` will still be capped by the max processing settings in the `ParallelConsumerOptions` class.

If you're used to using the auto commit mode in the normal Kafka consumer, you can think of the `Asynchronous` mode being similar to this.
We suggest starting with this mode, and it is the default.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package io.confluent.parallelconsumer;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.state.WorkContainer;
import lombok.Builder;
Expand Down Expand Up @@ -147,13 +146,17 @@ public enum CommitMode {
private final Duration defaultMessageRetryDelay = Duration.ofSeconds(1);

/**
* When present, use this to generate the retry delay, instad of {@link #getDefaultMessageRetryDelay()}.
* When present, use this to generate the retry delay, instead of {@link #getDefaultMessageRetryDelay()}.
* <p>
* Overrides {@link #defaultMessageRetryDelay}, even if it's set.
*/
@Builder.Default
private final Function<WorkContainer, Duration> retryDelayProvider;

/**
* Dirty global access to the {@link #retryDelayProvider}.
*/
// TODO remove need for writeable global access
public static Function<WorkContainer, Duration> retryDelayProviderStatic;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
package io.confluent.parallelconsumer.examples.core;

/*-
* Copyright (C) 2020 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.concurrent.CircuitBreakingException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY;
import static pl.tlinkowski.unij.api.UniLists.of;
Expand Down Expand Up @@ -111,4 +114,82 @@ static class Result {
String payload;
}

void customRetryDelay() {
// tag::customRetryDelay[]
final double multiplier = 0.5;
final int baseDelaySecond = 1;

ParallelConsumerOptions.<String, String>builder()
.retryDelayProvider(workContainer -> {
int numberOfFailedAttempts = workContainer.getNumberOfFailedAttempts();
long delayMillis = (long) (baseDelaySecond * Math.pow(multiplier, numberOfFailedAttempts) * 1000);
return Duration.ofMillis(delayMillis);
});
// end::customRetryDelay[]
}


void maxRetries() {
ParallelStreamProcessor<String, String> pc = ParallelStreamProcessor.createEosStreamProcessor(null);
// tag::maxRetries[]
final int maxRetries = 10;
final Map<ConsumerRecord<String, String>, Long> retriesCount = new ConcurrentHashMap<>();

pc.poll(consumerRecord -> {
Long retryCount = retriesCount.computeIfAbsent(consumerRecord, ignore -> 0L);
if (retryCount < maxRetries) {
processRecord(consumerRecord);
// no exception, so completed - remove from map
retriesCount.remove(consumerRecord);
} else {
log.warn("Retry count {} exceeded max of {} for record {}", retryCount, maxRetries, consumerRecord);
// giving up, remove from map
retriesCount.remove(consumerRecord);
}
});
// end::maxRetries[]
}

private void processRecord(final ConsumerRecord<String, String> record) {
// no-op
}

void circuitBreaker() {
ParallelStreamProcessor<String, String> pc = ParallelStreamProcessor.createEosStreamProcessor(null);
// tag::circuitBreaker[]
final Map<String, Boolean> upMap = new ConcurrentHashMap<>();

pc.poll(consumerRecord -> {
String serverId = extractServerId(consumerRecord);
boolean up = upMap.computeIfAbsent(serverId, ignore -> true);

if (!up) {
up = updateStatusOfSever(serverId);
}

if (up) {
try {
processRecord(consumerRecord);
} catch (CircuitBreakingException e) {
log.warn("Server {} is circuitBroken, will retry message when server is up. Record: {}", serverId, consumerRecord);
upMap.put(serverId, false);
}
// no exception, so set server status UP
upMap.put(serverId, true);
} else {
log.warn("Server {} currently down, will retry record latter {}", up, consumerRecord);
}
});
// end::circuitBreaker[]
}

private boolean updateStatusOfSever(final String serverId) {
return false;
}

private String extractServerId(final ConsumerRecord<String, String> consumerRecord) {
// no-op
return null;
}

}
Loading

0 comments on commit 4c62a9f

Please sign in to comment.