-
Notifications
You must be signed in to change notification settings - Fork 136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature: #242 Explicit terminal and retry exceptions for cleaner logging and poison pills #291
Changes from 13 commits
1a18281
6af5beb
3d704c3
7f1cd22
a3331e5
33454ad
4d2eaa1
28dd391
1c37ebf
8a62163
5b2b515
a9ded2d
1a72f3c
fb862d6
1f662d9
830717a
e778a09
7d56bad
eb42645
8a8a780
486caa5
93324d0
14aad84
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package io.confluent.parallelconsumer; | ||
|
||
/*- | ||
* Copyright (C) 2020-2022 Confluent, Inc. | ||
*/ | ||
|
||
/** | ||
* A user's processing function can throw this exception, which signals to PC that processing of the message has failed, | ||
* and that it should be retired at a later time. | ||
* <p> | ||
* The advantage of throwing this exception explicitly, is that PC will not log an ERROR. If any other type of exception | ||
* is thrown by the user's function, that will be logged as an error (but will still be retried later). | ||
* <p> | ||
* So in short, if this exception is thrown, nothing will be logged (except at DEBUG level), any other exception will be | ||
* logged as an error. | ||
*/ | ||
public class PCRetriableException extends RuntimeException { | ||
public PCRetriableException(String message) { | ||
super(message); | ||
} | ||
|
||
public PCRetriableException(String message, Throwable cause) { | ||
super(message, cause); | ||
} | ||
|
||
public PCRetriableException(Throwable cause) { | ||
super(cause); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package io.confluent.parallelconsumer; | ||
|
||
/*- | ||
* Copyright (C) 2020-2022 Confluent, Inc. | ||
*/ | ||
|
||
/** | ||
* A user's processing function can throw this exception, which signals to PC that processing of the message has failed, | ||
* and that it should be retired at a later time. | ||
* <p> | ||
* The advantage of throwing this exception explicitly, is that PC will not log an ERROR. If any other type of exception | ||
* is thrown by the user's function, that will be logged as an error (but will still be retried later). | ||
* <p> | ||
* So in short, if this exception is thrown, nothing will be logged (except at DEBUG level), any other exception will be | ||
* logged as an error. | ||
*/ | ||
public class PCTerminalException extends RuntimeException { | ||
public PCTerminalException(String message) { | ||
super(message); | ||
} | ||
|
||
public PCTerminalException(String message, Throwable cause) { | ||
super(message, cause); | ||
} | ||
|
||
public PCTerminalException(Throwable cause) { | ||
super(cause); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,9 +5,7 @@ | |
*/ | ||
|
||
import io.confluent.csid.utils.TimeUtils; | ||
import io.confluent.parallelconsumer.ParallelConsumer; | ||
import io.confluent.parallelconsumer.ParallelConsumerOptions; | ||
import io.confluent.parallelconsumer.PollContextInternal; | ||
import io.confluent.parallelconsumer.*; | ||
import io.confluent.parallelconsumer.state.WorkContainer; | ||
import io.confluent.parallelconsumer.state.WorkManager; | ||
import lombok.*; | ||
|
@@ -39,12 +37,16 @@ | |
import static io.confluent.csid.utils.BackportUtils.isEmpty; | ||
import static io.confluent.csid.utils.BackportUtils.toSeconds; | ||
import static io.confluent.csid.utils.StringUtils.msg; | ||
import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SHUTDOWN; | ||
import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SKIP; | ||
import static io.confluent.parallelconsumer.internal.State.*; | ||
import static java.time.Duration.ofMillis; | ||
import static java.util.concurrent.TimeUnit.MILLISECONDS; | ||
import static java.util.concurrent.TimeUnit.SECONDS; | ||
import static lombok.AccessLevel.PRIVATE; | ||
import static lombok.AccessLevel.PROTECTED; | ||
import static org.slf4j.event.Level.DEBUG; | ||
import static org.slf4j.event.Level.WARN; | ||
|
||
/** | ||
* @see ParallelConsumer | ||
|
@@ -1078,11 +1080,11 @@ private void updateLastCommitCheckTime() { | |
/** | ||
* Run the supplied function. | ||
*/ | ||
// todo extract class from this point | ||
protected <R> List<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> runUserFunction(Function<PollContextInternal<K, V>, List<R>> usersFunction, | ||
Consumer<R> callback, | ||
List<WorkContainer<K, V>> workContainerBatch) { | ||
// call the user's function | ||
List<R> resultsFromUserFunction; | ||
// catch and process any internal error | ||
try { | ||
if (log.isDebugEnabled()) { | ||
// first offset of the batch | ||
|
@@ -1099,37 +1101,101 @@ protected <R> List<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> runUserFunct | |
} | ||
|
||
PollContextInternal<K, V> context = new PollContextInternal<>(workContainerBatch); | ||
resultsFromUserFunction = usersFunction.apply(context); | ||
|
||
for (final WorkContainer<K, V> kvWorkContainer : workContainerBatch) { | ||
onUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction); | ||
} | ||
List<R> resultsFromUserFunction = runUserFunction(usersFunction, context); | ||
|
||
// capture each result, against the input record | ||
var intermediateResults = new ArrayList<Tuple<ConsumerRecord<K, V>, R>>(); | ||
for (R result : resultsFromUserFunction) { | ||
log.trace("Running users call back..."); | ||
callback.accept(result); | ||
} | ||
return handleUserSuccess(callback, workContainerBatch, resultsFromUserFunction); | ||
} catch (Exception e) { | ||
handleUserRetriableFailure(workContainerBatch, e); | ||
|
||
// fail or succeed, either way we're done | ||
for (var kvWorkContainer : workContainerBatch) { | ||
addToMailBoxOnUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction); | ||
} | ||
log.trace("User function future registered"); | ||
// throw again to make the future failed | ||
throw e; | ||
} | ||
} | ||
|
||
return intermediateResults; | ||
} catch (Exception e) { | ||
// handle fail | ||
log.error("Exception caught in user function running stage, registering WC as failed, returning to mailbox", e); | ||
for (var wc : workContainerBatch) { | ||
wc.onUserFunctionFailure(e); | ||
addToMailbox(wc); // always add on error | ||
private <R> ArrayList<Tuple<ConsumerRecord<K, V>, R>> handleUserSuccess(Consumer<R> callback, List<WorkContainer<K, V>> workContainerBatch, List<R> resultsFromUserFunction) { | ||
for (final WorkContainer<K, V> kvWorkContainer : workContainerBatch) { | ||
onUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction); | ||
} | ||
|
||
// capture each result, against the input record | ||
var intermediateResults = new ArrayList<Tuple<ConsumerRecord<K, V>, R>>(); | ||
for (R result : resultsFromUserFunction) { | ||
log.trace("Running users call back..."); | ||
callback.accept(result); | ||
} | ||
|
||
// fail or succeed, either way we're done | ||
for (var kvWorkContainer : workContainerBatch) { | ||
addToMailBoxOnUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction); | ||
} | ||
log.trace("User function future registered"); | ||
return intermediateResults; | ||
} | ||
|
||
private <R> List<R> runUserFunction(Function<PollContextInternal<K, V>, List<R>> usersFunction, | ||
PollContextInternal<K, V> context) { | ||
try { | ||
return usersFunction.apply(context); | ||
} catch (PCTerminalException e) { | ||
var reaction = getOptions().getTerminalFailureReaction(); | ||
|
||
if (reaction == SKIP) { | ||
log.warn("Terminal error in user function, skipping record due to configuration in {} - triggering context: {}", | ||
ParallelConsumerOptions.class.getSimpleName(), | ||
context); | ||
|
||
// return empty result to cause system to skip as if it succeeded | ||
return new ArrayList<>(); | ||
} else if (reaction == SHUTDOWN) { | ||
log.error("Shutting down upon terminal failure in user function due to {} setting in {} - triggering context: {}", | ||
ParallelConsumerOptions.TerminalFailureReaction.class.getSimpleName(), | ||
ParallelConsumerOptions.class.getSimpleName(), | ||
context); | ||
|
||
closeDontDrainFirst(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would it shutdown gracefully in this case? i.e. commit all that succeeded by this point, maybe should give some time for inflight processes as well (if it doesnt already) ? - to reduce possible duplicates on restart / rebalance. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, it is graceful - |
||
|
||
// throw again to make the future failed | ||
throw e; | ||
} else { | ||
throw new InternalRuntimeError(msg("Unsupported reaction config ({}) - submit a bug report.", reaction)); | ||
} | ||
throw e; // trow again to make the future failed | ||
} catch (Exception e) { | ||
log.error("Unknown internal error handling user function dispatch, terminating"); | ||
|
||
closeDontDrainFirst(); | ||
|
||
// throw again to make the future failed | ||
throw e; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is current behaviour for user function exceptions ? is it a breaking change in behaviour? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. current = retry :)
yeah there was a bug - check latest version? |
||
} | ||
|
||
private void handleUserRetriableFailure(List<WorkContainer<K, V>> workContainerBatch, Exception e) { | ||
logUserFunctionException(e); | ||
markRecordsFailed(workContainerBatch, e); | ||
} | ||
|
||
private void markRecordsFailed(List<WorkContainer<K, V>> workContainerBatch, Exception e) { | ||
for (var wc : workContainerBatch) { | ||
wc.onUserFunctionFailure(e); | ||
addToMailbox(wc); // always add on error | ||
} | ||
} | ||
|
||
/** | ||
* If user explicitly throws the {@link PCRetriableException}, then don't log it, as the user is already aware. | ||
* <p> | ||
* <a href=https://english.stackexchange.com/questions/305273/retriable-or-retryable#305274>Retriable or | ||
* Retryable?</a> Kafka uses Retriable, so we'll go with that ;) | ||
*/ | ||
private void logUserFunctionException(Exception e) { | ||
boolean explicitlyRetryable = e instanceof PCRetriableException; | ||
var level = explicitlyRetryable ? DEBUG : WARN; | ||
var prefix = explicitlyRetryable ? "Explicit " + PCRetriableException.class.getSimpleName() + " caught - " : ""; | ||
var message = prefix + "Exception in user function, registering record as failed, returning to queue"; | ||
log.atLevel(level).log(message, e); | ||
} | ||
|
||
protected void addToMailBoxOnUserFunctionSuccess(WorkContainer<K, V> wc, List<?> resultsFromUserFunction) { | ||
addToMailbox(wc); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update java doc