-
Notifications
You must be signed in to change notification settings - Fork 136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature: #242 Explicit terminal and retry exceptions for cleaner logging and poison pills #291
Changes from all commits
1a18281
6af5beb
3d704c3
7f1cd22
a3331e5
33454ad
4d2eaa1
28dd391
1c37ebf
8a62163
5b2b515
a9ded2d
1a72f3c
fb862d6
1f662d9
830717a
e778a09
7d56bad
eb42645
8a8a780
486caa5
93324d0
14aad84
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
package io.confluent.parallelconsumer; | ||
|
||
/*- | ||
* Copyright (C) 2020-2022 Confluent, Inc. | ||
*/ | ||
|
||
import lombok.ToString; | ||
|
||
/** | ||
* A user's processing function can throw this exception, which signals to PC that processing of the message has failed, | ||
* and that it should be retired at a later time. | ||
* <p> | ||
* The advantage of throwing this exception explicitly, is that PC will not log an ERROR. If any other type of exception | ||
* is thrown by the user's function, that will be logged as an error (but will still be retried later). | ||
* <p> | ||
* So in short, if this exception is thrown, nothing will be logged (except at DEBUG level), any other exception will be | ||
* logged as an error. | ||
*/ | ||
@ToString | ||
public class PCRetriableException extends PCUserException { | ||
|
||
public PCRetriableException(String message) { | ||
super(message); | ||
} | ||
|
||
public PCRetriableException(String message, Throwable cause) { | ||
super(message, cause); | ||
} | ||
|
||
public PCRetriableException(Throwable cause) { | ||
super(cause); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package io.confluent.parallelconsumer; | ||
|
||
/*- | ||
* Copyright (C) 2020-2022 Confluent, Inc. | ||
*/ | ||
|
||
/** | ||
* A user's processing function can throw this exception, which signals to PC that processing of the message has failed, | ||
* and that it should be retired at a later time. | ||
* <p> | ||
* The advantage of throwing this exception explicitly, is that PC will not log an ERROR. If any other type of exception | ||
* is thrown by the user's function, that will be logged as an error (but will still be retried later). | ||
* <p> | ||
* So in short, if this exception is thrown, nothing will be logged (except at DEBUG level), any other exception will be | ||
* logged as an error. | ||
*/ | ||
public class PCTerminalException extends PCUserException { | ||
public PCTerminalException(String message) { | ||
super(message); | ||
} | ||
|
||
public PCTerminalException(String message, Throwable cause) { | ||
super(message, cause); | ||
} | ||
|
||
public PCTerminalException(Throwable cause) { | ||
super(cause); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package io.confluent.parallelconsumer; | ||
|
||
/*- | ||
* Copyright (C) 2020-2022 Confluent, Inc. | ||
*/ | ||
|
||
/** | ||
* todo | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. docs |
||
*/ | ||
public class PCUserException extends RuntimeException { | ||
public PCUserException(String message) { | ||
super(message); | ||
} | ||
|
||
public PCUserException(String message, Throwable cause) { | ||
super(message, cause); | ||
} | ||
|
||
public PCUserException(Throwable cause) { | ||
super(cause); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
package io.confluent.parallelconsumer.internal; | ||
|
||
/*- | ||
* Copyright (C) 2020-2022 Confluent, Inc. | ||
*/ | ||
|
||
import io.confluent.parallelconsumer.*; | ||
import io.confluent.parallelconsumer.ParallelConsumer.Tuple; | ||
import io.confluent.parallelconsumer.state.WorkContainer; | ||
import lombok.AllArgsConstructor; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
import org.slf4j.MDC; | ||
import pl.tlinkowski.unij.api.UniLists; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.function.Consumer; | ||
import java.util.function.Function; | ||
|
||
import static io.confluent.csid.utils.StringUtils.msg; | ||
import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SHUTDOWN; | ||
import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SKIP; | ||
import static io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.MDC_WORK_CONTAINER_DESCRIPTOR; | ||
|
||
/** | ||
* Manages the result of running the user's function, particularly error handling. | ||
*/ | ||
@AllArgsConstructor | ||
@Slf4j | ||
public class UserFunctionRunner<K, V> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Logic seems ok. |
||
|
||
private final AbstractParallelEoSStreamProcessor<K, V> pc; | ||
|
||
/** | ||
* Run the supplied function. | ||
*/ | ||
protected <R> List<Tuple<ConsumerRecord<K, V>, R>> runUserFunction(Function<PollContextInternal<K, V>, List<R>> usersFunction, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would it be better to change it to be a bit more readable?
If exception from stale work check is specific or handled in that call itself - can even get rid of inner runWithUserExceptions method and pull exception handling up here. esp given its the main purpose of this class... |
||
Consumer<R> callback, | ||
List<WorkContainer<K, V>> workContainerBatch) { | ||
// catch and process any internal error | ||
try { | ||
if (log.isDebugEnabled()) { | ||
// first offset of the batch | ||
MDC.put(MDC_WORK_CONTAINER_DESCRIPTOR, workContainerBatch.get(0).offset() + ""); | ||
} | ||
log.trace("Pool received: {}", workContainerBatch); | ||
|
||
// | ||
boolean workIsStale = pc.getWm().checkIfWorkIsStale(workContainerBatch); | ||
if (workIsStale) { | ||
// when epoch's change, we can't remove them from the executor pool queue, so we just have to skip them when we find them | ||
log.debug("Pool found work from old generation of assigned work, skipping message as epoch doesn't match current {}", workContainerBatch); | ||
return UniLists.of(); | ||
} | ||
|
||
PollContextInternal<K, V> context = new PollContextInternal<>(workContainerBatch); | ||
|
||
return runWithUserExceptions(usersFunction, context, callback); | ||
} catch (PCUserException e) { | ||
// throw again to make the future failed | ||
throw e; | ||
} catch (Exception e) { | ||
log.error("Unknown internal error handling user function dispatch, terminating"); | ||
|
||
pc.closeDontDrainFirst(); | ||
|
||
// throw again to make the future failed | ||
throw e; | ||
} | ||
Comment on lines
+60
to
+70
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not too keen on this - you are still killing PC on general Exception thrown from userFunction - effectively making default behaviour for non-specified exceptions a worst case one ( Terminal with Shutdown behaviour). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs to be optional behaviour - I thought I had implemented that in this PR, but might be somewhere else. Either way, it should do like KS does - and have this be configurable. |
||
} | ||
|
||
private <R> List<Tuple<ConsumerRecord<K, V>, R>> runWithUserExceptions( | ||
Function<? super PollContextInternal<K, V>, ? extends List<R>> usersFunction, | ||
PollContextInternal<K, V> context, | ||
Consumer<R> callback) { | ||
try { | ||
var resultsFromUserFunction = usersFunction.apply(context); | ||
return handleUserSuccess(callback, context.getWorkContainers(), resultsFromUserFunction); | ||
} catch (PCTerminalException e) { | ||
return handleUserTerminalFailure(context, e, callback); | ||
} catch (PCRetriableException e) { | ||
handleExplicitUserRetriableFailure(context, e); | ||
|
||
// throw again to make the future failed | ||
throw e; | ||
} catch (Exception e) { | ||
handleImplicitUserRetriableFailure(context, e); | ||
|
||
// throw again to make the future failed | ||
throw e; | ||
} | ||
} | ||
|
||
private <R> List<Tuple<ConsumerRecord<K, V>, R>> handleUserSuccess(Consumer<R> callback, | ||
List<WorkContainer<K, V>> workContainerBatch, | ||
List<R> resultsFromUserFunction) { | ||
for (final WorkContainer<K, V> kvWorkContainer : workContainerBatch) { | ||
pc.onUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction); | ||
} | ||
|
||
// capture each result, against the input record | ||
var intermediateResults = new ArrayList<Tuple<ConsumerRecord<K, V>, R>>(); | ||
for (R result : resultsFromUserFunction) { | ||
log.trace("Running users call back..."); | ||
callback.accept(result); | ||
} | ||
|
||
// fail or succeed, either way we're done | ||
for (var kvWorkContainer : workContainerBatch) { | ||
pc.addToMailBoxOnUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction); | ||
} | ||
|
||
log.trace("User function future registered"); | ||
return intermediateResults; | ||
} | ||
|
||
private <R> List<Tuple<ConsumerRecord<K, V>, R>> handleUserTerminalFailure(PollContextInternal<K, V> context, | ||
PCTerminalException e, Consumer<R> callback) { | ||
var reaction = pc.getOptions().getTerminalFailureReaction(); | ||
|
||
if (reaction == SKIP) { | ||
log.warn("Terminal error in user function, skipping record due to configuration in {} - triggering context: {}", | ||
ParallelConsumerOptions.class.getSimpleName(), | ||
context); | ||
|
||
// return empty result to cause system to skip as if it succeeded | ||
return handleUserSuccess(callback, context.getWorkContainers(), UniLists.of()); | ||
} else if (reaction == SHUTDOWN) { | ||
log.error("Shutting down upon terminal failure in user function due to {} {} setting in {} - triggering context: {}", | ||
reaction, | ||
ParallelConsumerOptions.TerminalFailureReaction.class.getSimpleName(), | ||
ParallelConsumerOptions.class.getSimpleName(), | ||
context); | ||
|
||
pc.closeDontDrainFirst(); | ||
|
||
// throw again to make the future failed | ||
throw e; | ||
} else { | ||
throw new InternalRuntimeError(msg("Unsupported reaction config ({}) - submit a bug report.", reaction)); | ||
} | ||
} | ||
|
||
private void handleExplicitUserRetriableFailure(PollContextInternal<K, V> context, PCRetriableException e) { | ||
logUserFunctionException(e); | ||
markRecordsFailed(context.getWorkContainers(), e); | ||
} | ||
|
||
private void handleImplicitUserRetriableFailure(PollContextInternal<K, V> context, Exception e) { | ||
logUserFunctionException(e); | ||
markRecordsFailed(context.getWorkContainers(), e); | ||
} | ||
|
||
private void markRecordsFailed(List<WorkContainer<K, V>> workContainerBatch, Exception e) { | ||
for (var wc : workContainerBatch) { | ||
markRecordFailed(e, wc); | ||
} | ||
} | ||
|
||
private void markRecordFailed(Exception e, WorkContainer<K, V> wc) { | ||
wc.onUserFunctionFailure(e); | ||
pc.addToMailbox(wc); // always add on error | ||
} | ||
|
||
/** | ||
* If user explicitly throws the {@link PCRetriableException}, then don't log it, as the user is already aware. | ||
* <p> | ||
* <a href=https://english.stackexchange.com/questions/305273/retriable-or-retryable#305274>Retriable or | ||
* Retryable?</a> Kafka uses Retriable, so we'll go with that ;) | ||
*/ | ||
private void logUserFunctionException(Exception e) { | ||
var message = "in user function, registering record as failed, returning to queue"; | ||
if (e instanceof PCRetriableException) { | ||
log.debug("Explicit exception {} caught - {}", message, e.toString()); | ||
} else { | ||
log.warn("Exception {}", message, e); | ||
} | ||
} | ||
|
||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update java doc