From 1a182811311cfeab60f11e393a69a7e460af9fb4 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Thu, 21 Apr 2022 17:33:42 +0100 Subject: [PATCH 01/19] START: Explicit retry exception for cleaner logging --- .../parallelconsumer/RetriableException.java | 29 +++++++++++++++++++ .../AbstractParallelEoSStreamProcessor.java | 8 ++++- 2 files changed, 36 insertions(+), 1 deletion(-) create mode 100644 parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RetriableException.java diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RetriableException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RetriableException.java new file mode 100644 index 000000000..5e6ed1732 --- /dev/null +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RetriableException.java @@ -0,0 +1,29 @@ +package io.confluent.parallelconsumer; + +/*- + * Copyright (C) 2020-2022 Confluent, Inc. + */ + +/** + * A user's processing function can throw this exception, which signals to PC that processing of the message has failed, + * and that it should be retired at a later time. + *

+ * The advantage of throwing this exception explicitly, is that PC will not log an ERROR. If any other type of exception + * is thrown by the user's function, that will be logged as an error (but will still be retried later). + *

+ * So in short, if this exception is thrown, nothing will be logged (except at DEBUG level), any other exception will be + * logged as an error. + */ +public class RetriableException extends RuntimeException { + public RetriableException(String message) { + super(message); + } + + public RetriableException(String message, Throwable cause) { + super(message, cause); + } + + public RetriableException(Throwable cause) { + super(cause); + } +} diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index a6f95eb97..71a6d472a 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -8,6 +8,7 @@ import io.confluent.parallelconsumer.ParallelConsumer; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.PollContextInternal; +import io.confluent.parallelconsumer.RetriableException; import io.confluent.parallelconsumer.state.WorkContainer; import io.confluent.parallelconsumer.state.WorkManager; import lombok.*; @@ -1121,7 +1122,12 @@ protected List, R>> runUserFunct return intermediateResults; } catch (Exception e) { // handle fail - log.error("Exception caught in user function running stage, registering WC as failed, returning to mailbox", e); + String msg = "Exception caught in user function running stage, registering WC as failed, returning to mailbox"; + if (e instanceof RetriableException) { + log.debug("Explicit " + RetriableException.class.getSimpleName() + " caught, logging at DEBUG only. " + msg, e); + } else { + log.error(msg, e); + } for (var wc : workContainerBatch) { wc.onUserFunctionFailure(e); addToMailbox(wc); // always add on error From 6af5beb348fa28b6f89880d17377e2faf236e724 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 6 May 2022 13:16:47 +0100 Subject: [PATCH 02/19] switch tests to utilise --- .../java/io/confluent/parallelconsumer/FakeRuntimeError.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/FakeRuntimeError.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/FakeRuntimeError.java index 55bcb3acd..72a6007bc 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/FakeRuntimeError.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/FakeRuntimeError.java @@ -7,7 +7,7 @@ /** * Used for testing error handling - easier to identify than a plan exception. */ -public class FakeRuntimeError extends RuntimeException { +public class FakeRuntimeError extends RetriableException { public FakeRuntimeError(String msg) { super(msg); } From 7f1cd22e73f980b4a36f3c24f0a911e8a3722583 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Thu, 12 May 2022 14:07:03 +0100 Subject: [PATCH 03/19] alternative handling flow --- .../AbstractParallelEoSStreamProcessor.java | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index 71a6d472a..d609d4a04 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -1120,19 +1120,22 @@ protected List, R>> runUserFunct log.trace("User function future registered"); return intermediateResults; - } catch (Exception e) { - // handle fail + } catch (RetriableException e) { String msg = "Exception caught in user function running stage, registering WC as failed, returning to mailbox"; - if (e instanceof RetriableException) { - log.debug("Explicit " + RetriableException.class.getSimpleName() + " caught, logging at DEBUG only. " + msg, e); - } else { - log.error(msg, e); - } - for (var wc : workContainerBatch) { - wc.onUserFunctionFailure(e); - addToMailbox(wc); // always add on error - } + log.debug("Explicit " + RetriableException.class.getSimpleName() + " caught, logging at DEBUG only. " + msg, e); + handleFailure(workContainerBatch, e); throw e; // trow again to make the future failed + } catch (Exception e) { + log.error("Exception caught in user function running stage, registering WC as failed, returning to mailbox", e); + handleFailure(workContainerBatch, e); + throw e; // trow again to make the future failed + } + } + + private void handleFailure(final List> workContainerBatch, final Exception e) { + for (var wc : workContainerBatch) { + wc.onUserFunctionFailure(e); + addToMailbox(wc); // always add on error } } From a3331e57ff66e42b304bba8951d04452307a46e4 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Thu, 12 May 2022 15:50:08 +0100 Subject: [PATCH 04/19] use Slf4j alpha 7 for dynamic log level - check what else is new in 2 --- .../AbstractParallelEoSStreamProcessor.java | 24 ++++++++----------- pom.xml | 2 +- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index d609d4a04..997137ac8 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -46,6 +46,8 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static lombok.AccessLevel.PRIVATE; import static lombok.AccessLevel.PROTECTED; +import static org.slf4j.event.Level.DEBUG; +import static org.slf4j.event.Level.WARN; /** * @see ParallelConsumer @@ -1120,25 +1122,19 @@ protected List, R>> runUserFunct log.trace("User function future registered"); return intermediateResults; - } catch (RetriableException e) { - String msg = "Exception caught in user function running stage, registering WC as failed, returning to mailbox"; - log.debug("Explicit " + RetriableException.class.getSimpleName() + " caught, logging at DEBUG only. " + msg, e); - handleFailure(workContainerBatch, e); - throw e; // trow again to make the future failed } catch (Exception e) { - log.error("Exception caught in user function running stage, registering WC as failed, returning to mailbox", e); - handleFailure(workContainerBatch, e); + var level = e instanceof RetriableException ? DEBUG : WARN; + var prefix = e instanceof RetriableException ? "Explicit " + RetriableException.class.getSimpleName() + " caught: " : ""; + log.atLevel(level) + .log(prefix + "Exception caught in user function running stage, registering WC as failed, returning to mailbox", e); + for (var wc : workContainerBatch) { + wc.onUserFunctionFailure(e); + addToMailbox(wc); // always add on error + } throw e; // trow again to make the future failed } } - private void handleFailure(final List> workContainerBatch, final Exception e) { - for (var wc : workContainerBatch) { - wc.onUserFunctionFailure(e); - addToMailbox(wc); // always add on error - } - } - protected void addToMailBoxOnUserFunctionSuccess(WorkContainer wc, List resultsFromUserFunction) { addToMailbox(wc); } diff --git a/pom.xml b/pom.xml index 7f8345f22..13328272f 100644 --- a/pom.xml +++ b/pom.xml @@ -97,7 +97,7 @@ 3.0.0-M6 - 1.7.36 + 2.0.0-alpha7 3.1.0 0.1.3 From 4d2eaa1aeb8f183e251562efd8288412d22f079e Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Thu, 12 May 2022 15:52:31 +0100 Subject: [PATCH 05/19] wording --- .../internal/AbstractParallelEoSStreamProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index 997137ac8..72e7b1d46 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -1126,7 +1126,7 @@ protected List, R>> runUserFunct var level = e instanceof RetriableException ? DEBUG : WARN; var prefix = e instanceof RetriableException ? "Explicit " + RetriableException.class.getSimpleName() + " caught: " : ""; log.atLevel(level) - .log(prefix + "Exception caught in user function running stage, registering WC as failed, returning to mailbox", e); + .log(prefix + "Exception caught in user function running stage, registering WC as failed, returning to queue", e); for (var wc : workContainerBatch) { wc.onUserFunctionFailure(e); addToMailbox(wc); // always add on error From 28dd391a98caa6766733c11b948ae3525ea31b85 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 13 May 2022 12:03:17 +0100 Subject: [PATCH 06/19] step --- .../AbstractParallelEoSStreamProcessor.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index 72e7b1d46..29c3960e7 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -1123,18 +1123,25 @@ protected List, R>> runUserFunct return intermediateResults; } catch (Exception e) { - var level = e instanceof RetriableException ? DEBUG : WARN; - var prefix = e instanceof RetriableException ? "Explicit " + RetriableException.class.getSimpleName() + " caught: " : ""; - log.atLevel(level) - .log(prefix + "Exception caught in user function running stage, registering WC as failed, returning to queue", e); + logUserFunctionException(e); + for (var wc : workContainerBatch) { wc.onUserFunctionFailure(e); addToMailbox(wc); // always add on error } - throw e; // trow again to make the future failed + + throw e; // throw again to make the future failed } } + private void logUserFunctionException(Exception e) { + boolean retriable = e instanceof RetriableException; + var level = retriable ? DEBUG : WARN; + var prefix = retriable ? "Explicit " + RetriableException.class.getSimpleName() + " caught: " : ""; + log.atLevel(level) + .log(prefix + "Exception caught in user function running stage, registering WC as failed, returning to queue", e); + } + protected void addToMailBoxOnUserFunctionSuccess(WorkContainer wc, List resultsFromUserFunction) { addToMailbox(wc); } From 1c37ebfbb71c48e604f61a5df81bba7fb3707793 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 13 May 2022 12:08:44 +0100 Subject: [PATCH 07/19] step --- .../AbstractParallelEoSStreamProcessor.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index 29c3960e7..f950ea33c 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -1134,12 +1134,17 @@ protected List, R>> runUserFunct } } + /** + * If user explicitly throws the {@link RetriableException}, then don't log as user already knows. + *

+ * https://english.stackexchange.com/questions/305273/retriable-or-retryable#305274 + */ private void logUserFunctionException(Exception e) { - boolean retriable = e instanceof RetriableException; - var level = retriable ? DEBUG : WARN; - var prefix = retriable ? "Explicit " + RetriableException.class.getSimpleName() + " caught: " : ""; - log.atLevel(level) - .log(prefix + "Exception caught in user function running stage, registering WC as failed, returning to queue", e); + boolean explicitlyRetryable = e instanceof RetriableException; + var level = explicitlyRetryable ? DEBUG : WARN; + var prefix = explicitlyRetryable ? "Explicit " + RetriableException.class.getSimpleName() + " caught - " : ""; + var message = prefix + "Exception in user function, registering record as failed, returning to queue"; + log.atLevel(level).log(message, e); } protected void addToMailBoxOnUserFunctionSuccess(WorkContainer wc, List resultsFromUserFunction) { From 5b2b515255fec9e14fb17c7e1787f3c522cfdbda Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 13 May 2022 12:35:52 +0100 Subject: [PATCH 08/19] step --- ...riableException.java => PCRetriableException.java} | 8 ++++---- .../internal/AbstractParallelEoSStreamProcessor.java | 11 ++++++----- .../confluent/parallelconsumer/FakeRuntimeError.java | 4 ++-- 3 files changed, 12 insertions(+), 11 deletions(-) rename parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/{RetriableException.java => PCRetriableException.java} (76%) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RetriableException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java similarity index 76% rename from parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RetriableException.java rename to parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java index 5e6ed1732..9b9cb24f6 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RetriableException.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java @@ -14,16 +14,16 @@ * So in short, if this exception is thrown, nothing will be logged (except at DEBUG level), any other exception will be * logged as an error. */ -public class RetriableException extends RuntimeException { - public RetriableException(String message) { +public class PCRetriableException extends RuntimeException { + public PCRetriableException(String message) { super(message); } - public RetriableException(String message, Throwable cause) { + public PCRetriableException(String message, Throwable cause) { super(message, cause); } - public RetriableException(Throwable cause) { + public PCRetriableException(Throwable cause) { super(cause); } } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index f950ea33c..85906b430 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -5,10 +5,10 @@ */ import io.confluent.csid.utils.TimeUtils; +import io.confluent.parallelconsumer.PCRetriableException; import io.confluent.parallelconsumer.ParallelConsumer; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.PollContextInternal; -import io.confluent.parallelconsumer.RetriableException; import io.confluent.parallelconsumer.state.WorkContainer; import io.confluent.parallelconsumer.state.WorkManager; import lombok.*; @@ -1135,14 +1135,15 @@ protected List, R>> runUserFunct } /** - * If user explicitly throws the {@link RetriableException}, then don't log as user already knows. + * If user explicitly throws the {@link PCRetriableException}, then don't log it, as the user is already aware. *

- * https://english.stackexchange.com/questions/305273/retriable-or-retryable#305274 + * Retriable or + * Retryable? Kafka uses Retriable, so we'll go with that ;) */ private void logUserFunctionException(Exception e) { - boolean explicitlyRetryable = e instanceof RetriableException; + boolean explicitlyRetryable = e instanceof PCRetriableException; var level = explicitlyRetryable ? DEBUG : WARN; - var prefix = explicitlyRetryable ? "Explicit " + RetriableException.class.getSimpleName() + " caught - " : ""; + var prefix = explicitlyRetryable ? "Explicit " + PCRetriableException.class.getSimpleName() + " caught - " : ""; var message = prefix + "Exception in user function, registering record as failed, returning to queue"; log.atLevel(level).log(message, e); } diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/FakeRuntimeError.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/FakeRuntimeError.java index 72a6007bc..8cbc483cc 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/FakeRuntimeError.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/FakeRuntimeError.java @@ -1,13 +1,13 @@ package io.confluent.parallelconsumer; /*- - * Copyright (C) 2020-2021 Confluent, Inc. + * Copyright (C) 2020-2022 Confluent, Inc. */ /** * Used for testing error handling - easier to identify than a plan exception. */ -public class FakeRuntimeError extends RetriableException { +public class FakeRuntimeError extends PCRetriableException { public FakeRuntimeError(String msg) { super(msg); } From a9ded2dcd8e529e392f0af2b7e765a84ab9e5d7d Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 13 May 2022 13:35:59 +0100 Subject: [PATCH 09/19] terminal exception and handling --- .../parallelconsumer/PCTerminalException.java | 29 ++++++++++++ .../ParallelConsumerOptions.java | 9 ++++ .../AbstractParallelEoSStreamProcessor.java | 45 ++++++++++++++----- 3 files changed, 73 insertions(+), 10 deletions(-) create mode 100644 parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCTerminalException.java diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCTerminalException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCTerminalException.java new file mode 100644 index 000000000..f6b57ae10 --- /dev/null +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCTerminalException.java @@ -0,0 +1,29 @@ +package io.confluent.parallelconsumer; + +/*- + * Copyright (C) 2020-2022 Confluent, Inc. + */ + +/** + * A user's processing function can throw this exception, which signals to PC that processing of the message has failed, + * and that it should be retired at a later time. + *

+ * The advantage of throwing this exception explicitly, is that PC will not log an ERROR. If any other type of exception + * is thrown by the user's function, that will be logged as an error (but will still be retried later). + *

+ * So in short, if this exception is thrown, nothing will be logged (except at DEBUG level), any other exception will be + * logged as an error. + */ +public class PCTerminalException extends RuntimeException { + public PCTerminalException(String message) { + super(message); + } + + public PCTerminalException(String message, Throwable cause) { + super(message, cause); + } + + public PCTerminalException(Throwable cause) { + super(cause); + } +} diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java index 771e3e52d..a1e956805 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java @@ -223,6 +223,15 @@ public boolean isUsingBatching() { @Builder.Default private final int maxFailureHistory = 10; + private final TerminalFailureReaction terminalFailureReaction; + + public enum TerminalFailureReaction { + DIE, + SKIP, + RETRY, + // DLQ, TODO + } + /** * @return the combined target of the desired concurrency by the configured batch size */ diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index 85906b430..47150846c 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -5,10 +5,7 @@ */ import io.confluent.csid.utils.TimeUtils; -import io.confluent.parallelconsumer.PCRetriableException; -import io.confluent.parallelconsumer.ParallelConsumer; -import io.confluent.parallelconsumer.ParallelConsumerOptions; -import io.confluent.parallelconsumer.PollContextInternal; +import io.confluent.parallelconsumer.*; import io.confluent.parallelconsumer.state.WorkContainer; import io.confluent.parallelconsumer.state.WorkManager; import lombok.*; @@ -1122,18 +1119,46 @@ protected List, R>> runUserFunct log.trace("User function future registered"); return intermediateResults; + } catch (PCTerminalException e) { + handleTerminalFailure(workContainerBatch, e); + + // return empty results + return new ArrayList<>(); } catch (Exception e) { - logUserFunctionException(e); + handleRetriableFailure(workContainerBatch, e); - for (var wc : workContainerBatch) { - wc.onUserFunctionFailure(e); - addToMailbox(wc); // always add on error - } + // throw again to make the future failed + throw e; + } + } - throw e; // throw again to make the future failed + private void handleRetriableFailure(List> workContainerBatch, Exception e) { + logUserFunctionException(e); + + for (var wc : workContainerBatch) { + wc.onUserFunctionFailure(e); + addToMailbox(wc); // always add on error } } + protected void handleTerminalFailure(List> workContainerBatch, PCTerminalException e) { + var reaction = getOptions().getTerminalFailureReaction(); + + switch (reaction) { + case SKIP -> skip(workContainerBatch); + case RETRY -> retry(workContainerBatch); + case DIE -> die(workContainerBatch); + } + } + + protected void die(List> workContainerBatch) { + System.exit(69); + } + + protected abstract void retry(List> workContainerBatch); + + protected abstract void skip(List> workContainerBatch); + /** * If user explicitly throws the {@link PCRetriableException}, then don't log it, as the user is already aware. *

From 1a72f3c7d44bbe0344cf6e1a813ad295cafe4372 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 13 May 2022 14:17:24 +0100 Subject: [PATCH 10/19] handle terminal --- .../ParallelConsumerOptions.java | 3 +- .../AbstractParallelEoSStreamProcessor.java | 105 +++++++++++------- 2 files changed, 65 insertions(+), 43 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java index a1e956805..3f6e92240 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java @@ -226,9 +226,8 @@ public boolean isUsingBatching() { private final TerminalFailureReaction terminalFailureReaction; public enum TerminalFailureReaction { - DIE, + SHUTDOWN, SKIP, - RETRY, // DLQ, TODO } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index 47150846c..7f251f3b1 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -37,6 +37,8 @@ import static io.confluent.csid.utils.BackportUtils.isEmpty; import static io.confluent.csid.utils.BackportUtils.toSeconds; import static io.confluent.csid.utils.StringUtils.msg; +import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SHUTDOWN; +import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SKIP; import static io.confluent.parallelconsumer.internal.State.*; import static java.time.Duration.ofMillis; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -1078,11 +1080,11 @@ private void updateLastCommitCheckTime() { /** * Run the supplied function. */ + // todo extract class from this point protected List, R>> runUserFunction(Function, List> usersFunction, Consumer callback, List> workContainerBatch) { - // call the user's function - List resultsFromUserFunction; + // catch and process any internal error try { if (log.isDebugEnabled()) { // first offset of the batch @@ -1099,66 +1101,87 @@ protected List, R>> runUserFunct } PollContextInternal context = new PollContextInternal<>(workContainerBatch); - resultsFromUserFunction = usersFunction.apply(context); - for (final WorkContainer kvWorkContainer : workContainerBatch) { - onUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction); - } + List resultsFromUserFunction = runUserFunction(usersFunction, context); - // capture each result, against the input record - var intermediateResults = new ArrayList, R>>(); - for (R result : resultsFromUserFunction) { - log.trace("Running users call back..."); - callback.accept(result); - } + return handleUserSuccess(callback, workContainerBatch, resultsFromUserFunction); + } catch (Exception e) { + handleUserRetriableFailure(workContainerBatch, e); - // fail or succeed, either way we're done - for (var kvWorkContainer : workContainerBatch) { - addToMailBoxOnUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction); - } - log.trace("User function future registered"); + // throw again to make the future failed + throw e; + } + } - return intermediateResults; + private ArrayList, R>> handleUserSuccess(Consumer callback, List> workContainerBatch, List resultsFromUserFunction) { + for (final WorkContainer kvWorkContainer : workContainerBatch) { + onUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction); + } + + // capture each result, against the input record + var intermediateResults = new ArrayList, R>>(); + for (R result : resultsFromUserFunction) { + log.trace("Running users call back..."); + callback.accept(result); + } + + // fail or succeed, either way we're done + for (var kvWorkContainer : workContainerBatch) { + addToMailBoxOnUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction); + } + log.trace("User function future registered"); + return intermediateResults; + } + + private List runUserFunction(Function, List> usersFunction, + PollContextInternal context) { + try { + return usersFunction.apply(context); } catch (PCTerminalException e) { - handleTerminalFailure(workContainerBatch, e); + var reaction = getOptions().getTerminalFailureReaction(); - // return empty results - return new ArrayList<>(); + if (reaction == SKIP) { + log.warn("Terminal error in user function, skipping record due to configuration in {} - triggering context: {}", + ParallelConsumerOptions.class.getSimpleName(), + context); + + // return empty result to cause system to skip as if it succeeded + return new ArrayList<>(); + } else if (reaction == SHUTDOWN) { + log.error("Shutting down upon terminal failure in user function due to {} setting in {} - triggering context: {}", + ParallelConsumerOptions.TerminalFailureReaction.class.getSimpleName(), + ParallelConsumerOptions.class.getSimpleName(), + context); + + closeDontDrainFirst(); + + // throw again to make the future failed + throw e; + } else { + throw new InternalRuntimeError(msg("Unsupported reaction config ({}) - submit a bug report.", reaction)); + } } catch (Exception e) { - handleRetriableFailure(workContainerBatch, e); + log.error("Unknown internal error handling user function dispatch, terminating"); + + closeDontDrainFirst(); // throw again to make the future failed throw e; } } - private void handleRetriableFailure(List> workContainerBatch, Exception e) { + private void handleUserRetriableFailure(List> workContainerBatch, Exception e) { logUserFunctionException(e); + markRecordsFailed(workContainerBatch, e); + } + private void markRecordsFailed(List> workContainerBatch, Exception e) { for (var wc : workContainerBatch) { wc.onUserFunctionFailure(e); addToMailbox(wc); // always add on error } } - protected void handleTerminalFailure(List> workContainerBatch, PCTerminalException e) { - var reaction = getOptions().getTerminalFailureReaction(); - - switch (reaction) { - case SKIP -> skip(workContainerBatch); - case RETRY -> retry(workContainerBatch); - case DIE -> die(workContainerBatch); - } - } - - protected void die(List> workContainerBatch) { - System.exit(69); - } - - protected abstract void retry(List> workContainerBatch); - - protected abstract void skip(List> workContainerBatch); - /** * If user explicitly throws the {@link PCRetriableException}, then don't log it, as the user is already aware. *

From fb862d6db95f7d94ed70ee0a57ec4d31720a8c72 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 13 May 2022 16:42:42 +0100 Subject: [PATCH 11/19] introduce Offsets class --- .../confluent/parallelconsumer/Offsets.java | 12 +++++++ .../PCRetriableException.java | 31 +++++++++++++++++++ .../AbstractParallelEoSStreamProcessor.java | 12 +++---- 3 files changed, 49 insertions(+), 6 deletions(-) create mode 100644 parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java new file mode 100644 index 000000000..73f8bcf7e --- /dev/null +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java @@ -0,0 +1,12 @@ +package io.confluent.parallelconsumer; + +import lombok.RequiredArgsConstructor; +import lombok.experimental.Delegate; + +import java.util.List; + +@RequiredArgsConstructor +public class Offsets { + @Delegate + final private List ofssets; +} diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java index 9b9cb24f6..a50884cd7 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java @@ -4,6 +4,11 @@ * Copyright (C) 2020-2022 Confluent, Inc. */ +import lombok.Getter; + +import java.util.List; +import java.util.Optional; + /** * A user's processing function can throw this exception, which signals to PC that processing of the message has failed, * and that it should be retired at a later time. @@ -15,6 +20,32 @@ * logged as an error. */ public class PCRetriableException extends RuntimeException { + + @Getter + private Optional offsetsOptional = Optional.empty(); + + /** + * todo + * + * @param message + * @param offsets + */ + public PCRetriableException(String message, Offsets offsets) { + super(message); + this.offsetsOptional = Optional.of(offsets); + } + + /** + * todo + * + * @param message + * @param offsets + */ + public PCRetriableException(String message, List offsets) { + super(message); + offsetsOptional = Optional.empty(); + } + public PCRetriableException(String message) { super(message); } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index 7f251f3b1..626275568 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -1106,7 +1106,9 @@ protected List, R>> runUserFunct return handleUserSuccess(callback, workContainerBatch, resultsFromUserFunction); } catch (Exception e) { - handleUserRetriableFailure(workContainerBatch, e); + log.error("Unknown internal error handling user function dispatch, terminating"); + + closeDontDrainFirst(); // throw again to make the future failed throw e; @@ -1161,18 +1163,16 @@ private List runUserFunction(Function, List> throw new InternalRuntimeError(msg("Unsupported reaction config ({}) - submit a bug report.", reaction)); } } catch (Exception e) { - log.error("Unknown internal error handling user function dispatch, terminating"); - - closeDontDrainFirst(); + handleUserRetriableFailure(context, e); // throw again to make the future failed throw e; } } - private void handleUserRetriableFailure(List> workContainerBatch, Exception e) { + private void handleUserRetriableFailure(PollContextInternal context, Exception e) { logUserFunctionException(e); - markRecordsFailed(workContainerBatch, e); + markRecordsFailed(context.getWorkContainers(), e); } private void markRecordsFailed(List> workContainerBatch, Exception e) { From 1f662d98dc594d4dcdf69350ae6983fa07333c0e Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 13 May 2022 16:52:17 +0100 Subject: [PATCH 12/19] step --- .../parallelconsumer/PCRetriableException.java | 2 +- .../parallelconsumer/PCTerminalException.java | 2 +- .../parallelconsumer/PCUserException.java | 18 ++++++++++++++++++ .../AbstractParallelEoSStreamProcessor.java | 3 +++ 4 files changed, 23 insertions(+), 2 deletions(-) create mode 100644 parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCUserException.java diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java index a50884cd7..f17cd190c 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java @@ -19,7 +19,7 @@ * So in short, if this exception is thrown, nothing will be logged (except at DEBUG level), any other exception will be * logged as an error. */ -public class PCRetriableException extends RuntimeException { +public class PCRetriableException extends PCUserException { @Getter private Optional offsetsOptional = Optional.empty(); diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCTerminalException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCTerminalException.java index f6b57ae10..922bf1de3 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCTerminalException.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCTerminalException.java @@ -14,7 +14,7 @@ * So in short, if this exception is thrown, nothing will be logged (except at DEBUG level), any other exception will be * logged as an error. */ -public class PCTerminalException extends RuntimeException { +public class PCTerminalException extends PCUserException { public PCTerminalException(String message) { super(message); } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCUserException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCUserException.java new file mode 100644 index 000000000..5edf398c3 --- /dev/null +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCUserException.java @@ -0,0 +1,18 @@ +package io.confluent.parallelconsumer; + +/** + * todo + */ +public class PCUserException extends RuntimeException { + public PCUserException(String message) { + super(message); + } + + public PCUserException(String message, Throwable cause) { + super(message, cause); + } + + public PCUserException(Throwable cause) { + super(cause); + } +} diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index 626275568..5425a30db 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -1105,6 +1105,9 @@ protected List, R>> runUserFunct List resultsFromUserFunction = runUserFunction(usersFunction, context); return handleUserSuccess(callback, workContainerBatch, resultsFromUserFunction); + } catch (PCUserException e) { + // throw again to make the future failed + throw e; } catch (Exception e) { log.error("Unknown internal error handling user function dispatch, terminating"); From 830717a06290768ffdffdcdbc75d6e41d6a1c972 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 13 May 2022 17:48:38 +0100 Subject: [PATCH 13/19] step --- .../java/io/confluent/parallelconsumer/Offsets.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java index 73f8bcf7e..1ca9ff888 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java @@ -4,9 +4,17 @@ import lombok.experimental.Delegate; import java.util.List; +import java.util.stream.Collectors; @RequiredArgsConstructor public class Offsets { + @Delegate - final private List ofssets; + private final List rawOffsets; + + public Offsets(List> records) { + rawOffsets = records.stream() + .map(RecordContext::offset) + .collect(Collectors.toUnmodifiableList()); + } } From e778a093a79a65f784aac3928011f2374b16943d Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 13 May 2022 21:09:53 +0100 Subject: [PATCH 14/19] test and batch drafting --- .../confluent/parallelconsumer/Offsets.java | 22 ++- .../parallelconsumer/PollContextInternal.java | 2 + .../RecordContextInternal.java | 2 + .../AbstractParallelEoSStreamProcessor.java | 135 +--------------- .../internal/FunctionRunnerThing.java | 152 ++++++++++++++++++ .../internal/FunctionRunnerThingTest.java | 74 +++++++++ 6 files changed, 253 insertions(+), 134 deletions(-) create mode 100644 parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/FunctionRunnerThing.java create mode 100644 parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/FunctionRunnerThingTest.java diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java index 1ca9ff888..87072c2ac 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java @@ -1,20 +1,32 @@ package io.confluent.parallelconsumer; -import lombok.RequiredArgsConstructor; import lombok.experimental.Delegate; +import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; -@RequiredArgsConstructor public class Offsets { @Delegate private final List rawOffsets; - public Offsets(List> records) { - rawOffsets = records.stream() + public Offsets(List records) { + this.rawOffsets = records; + } + + public static Offsets of(List> records) { + return ofLongs(records.stream() .map(RecordContext::offset) - .collect(Collectors.toUnmodifiableList()); + .collect(Collectors.toUnmodifiableList())); + } + + // due to type erasure, can't use method overloading + public static Offsets ofLongs(List rawOffsetsIn) { + return new Offsets(rawOffsetsIn); + } + + public static Offsets ofLongs(long... rawOffsetsIn) { + return new Offsets(Arrays.stream(rawOffsetsIn).boxed().collect(Collectors.toList())); } } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PollContextInternal.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PollContextInternal.java index acf12eb0c..108627f42 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PollContextInternal.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PollContextInternal.java @@ -6,6 +6,7 @@ import io.confluent.parallelconsumer.state.WorkContainer; import lombok.Getter; +import lombok.ToString; import lombok.experimental.Delegate; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -16,6 +17,7 @@ /** * Internal only view on the {@link PollContext}. */ +@ToString public class PollContextInternal { @Delegate diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RecordContextInternal.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RecordContextInternal.java index 44c1a04ed..1385116cb 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RecordContextInternal.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RecordContextInternal.java @@ -6,10 +6,12 @@ import io.confluent.parallelconsumer.state.WorkContainer; import lombok.Getter; +import lombok.ToString; /** * Internal only view of the {@link RecordContext} class. */ +@ToString public class RecordContextInternal { @Getter diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index 5425a30db..4e0a9da61 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -5,13 +5,14 @@ */ import io.confluent.csid.utils.TimeUtils; -import io.confluent.parallelconsumer.*; +import io.confluent.parallelconsumer.ParallelConsumer; +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.PollContextInternal; import io.confluent.parallelconsumer.state.WorkContainer; import io.confluent.parallelconsumer.state.WorkManager; import lombok.*; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator; @@ -37,16 +38,12 @@ import static io.confluent.csid.utils.BackportUtils.isEmpty; import static io.confluent.csid.utils.BackportUtils.toSeconds; import static io.confluent.csid.utils.StringUtils.msg; -import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SHUTDOWN; -import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SKIP; import static io.confluent.parallelconsumer.internal.State.*; import static java.time.Duration.ofMillis; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static lombok.AccessLevel.PRIVATE; import static lombok.AccessLevel.PROTECTED; -import static org.slf4j.event.Level.DEBUG; -import static org.slf4j.event.Level.WARN; /** * @see ParallelConsumer @@ -761,6 +758,7 @@ protected void submitWorkToPool(Function, List> } } + private void submitWorkToPoolInner(final Function, List> usersFunction, final Consumer callback, final List> batch) { @@ -768,7 +766,8 @@ private void submitWorkToPoolInner(final Function, log.trace("Sending work ({}) to pool", batch); Future outputRecordFuture = workerThreadPool.submit(() -> { addInstanceMDC(); - return runUserFunction(usersFunction, callback, batch); + FunctionRunnerThing objectObjectFunctionRunnerThing = new FunctionRunnerThing<>(this); + return objectObjectFunctionRunnerThing.runUserFunction(usersFunction, callback, batch); }); // for a batch, each message in the batch shares the same result for (final WorkContainer workContainer : batch) { @@ -1077,128 +1076,6 @@ private void updateLastCommitCheckTime() { lastCommitCheckTime = Instant.now(); } - /** - * Run the supplied function. - */ - // todo extract class from this point - protected List, R>> runUserFunction(Function, List> usersFunction, - Consumer callback, - List> workContainerBatch) { - // catch and process any internal error - try { - if (log.isDebugEnabled()) { - // first offset of the batch - MDC.put("offset", workContainerBatch.get(0).offset() + ""); - } - log.trace("Pool received: {}", workContainerBatch); - - // - boolean workIsStale = wm.checkIfWorkIsStale(workContainerBatch); - if (workIsStale) { - // when epoch's change, we can't remove them from the executor pool queue, so we just have to skip them when we find them - log.debug("Pool found work from old generation of assigned work, skipping message as epoch doesn't match current {}", workContainerBatch); - return null; - } - - PollContextInternal context = new PollContextInternal<>(workContainerBatch); - - List resultsFromUserFunction = runUserFunction(usersFunction, context); - - return handleUserSuccess(callback, workContainerBatch, resultsFromUserFunction); - } catch (PCUserException e) { - // throw again to make the future failed - throw e; - } catch (Exception e) { - log.error("Unknown internal error handling user function dispatch, terminating"); - - closeDontDrainFirst(); - - // throw again to make the future failed - throw e; - } - } - - private ArrayList, R>> handleUserSuccess(Consumer callback, List> workContainerBatch, List resultsFromUserFunction) { - for (final WorkContainer kvWorkContainer : workContainerBatch) { - onUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction); - } - - // capture each result, against the input record - var intermediateResults = new ArrayList, R>>(); - for (R result : resultsFromUserFunction) { - log.trace("Running users call back..."); - callback.accept(result); - } - - // fail or succeed, either way we're done - for (var kvWorkContainer : workContainerBatch) { - addToMailBoxOnUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction); - } - log.trace("User function future registered"); - return intermediateResults; - } - - private List runUserFunction(Function, List> usersFunction, - PollContextInternal context) { - try { - return usersFunction.apply(context); - } catch (PCTerminalException e) { - var reaction = getOptions().getTerminalFailureReaction(); - - if (reaction == SKIP) { - log.warn("Terminal error in user function, skipping record due to configuration in {} - triggering context: {}", - ParallelConsumerOptions.class.getSimpleName(), - context); - - // return empty result to cause system to skip as if it succeeded - return new ArrayList<>(); - } else if (reaction == SHUTDOWN) { - log.error("Shutting down upon terminal failure in user function due to {} setting in {} - triggering context: {}", - ParallelConsumerOptions.TerminalFailureReaction.class.getSimpleName(), - ParallelConsumerOptions.class.getSimpleName(), - context); - - closeDontDrainFirst(); - - // throw again to make the future failed - throw e; - } else { - throw new InternalRuntimeError(msg("Unsupported reaction config ({}) - submit a bug report.", reaction)); - } - } catch (Exception e) { - handleUserRetriableFailure(context, e); - - // throw again to make the future failed - throw e; - } - } - - private void handleUserRetriableFailure(PollContextInternal context, Exception e) { - logUserFunctionException(e); - markRecordsFailed(context.getWorkContainers(), e); - } - - private void markRecordsFailed(List> workContainerBatch, Exception e) { - for (var wc : workContainerBatch) { - wc.onUserFunctionFailure(e); - addToMailbox(wc); // always add on error - } - } - - /** - * If user explicitly throws the {@link PCRetriableException}, then don't log it, as the user is already aware. - *

- * Retriable or - * Retryable? Kafka uses Retriable, so we'll go with that ;) - */ - private void logUserFunctionException(Exception e) { - boolean explicitlyRetryable = e instanceof PCRetriableException; - var level = explicitlyRetryable ? DEBUG : WARN; - var prefix = explicitlyRetryable ? "Explicit " + PCRetriableException.class.getSimpleName() + " caught - " : ""; - var message = prefix + "Exception in user function, registering record as failed, returning to queue"; - log.atLevel(level).log(message, e); - } - protected void addToMailBoxOnUserFunctionSuccess(WorkContainer wc, List resultsFromUserFunction) { addToMailbox(wc); } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/FunctionRunnerThing.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/FunctionRunnerThing.java new file mode 100644 index 000000000..e89044ce5 --- /dev/null +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/FunctionRunnerThing.java @@ -0,0 +1,152 @@ +package io.confluent.parallelconsumer.internal; + +import io.confluent.parallelconsumer.*; +import io.confluent.parallelconsumer.state.WorkContainer; +import lombok.Value; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.MDC; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; + +import static io.confluent.csid.utils.StringUtils.msg; +import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SHUTDOWN; +import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SKIP; +import static org.slf4j.event.Level.DEBUG; +import static org.slf4j.event.Level.WARN; + +@Value + +@Slf4j +public class FunctionRunnerThing { + + AbstractParallelEoSStreamProcessor pc; + + /** + * Run the supplied function. + */ + // todo extract class from this point + protected List, R>> runUserFunction(Function, List> usersFunction, + Consumer callback, + List> workContainerBatch) { + // catch and process any internal error + try { + if (log.isDebugEnabled()) { + // first offset of the batch + MDC.put("offset", workContainerBatch.get(0).offset() + ""); + } + log.trace("Pool received: {}", workContainerBatch); + + // + boolean workIsStale = pc.getWm().checkIfWorkIsStale(workContainerBatch); + if (workIsStale) { + // when epoch's change, we can't remove them from the executor pool queue, so we just have to skip them when we find them + log.debug("Pool found work from old generation of assigned work, skipping message as epoch doesn't match current {}", workContainerBatch); + return null; + } + + PollContextInternal context = new PollContextInternal<>(workContainerBatch); + + List resultsFromUserFunction = runUserFunction(usersFunction, context); + + return handleUserSuccess(callback, workContainerBatch, resultsFromUserFunction); + } catch (PCUserException e) { + // throw again to make the future failed + throw e; + } catch (Exception e) { + log.error("Unknown internal error handling user function dispatch, terminating"); + + pc.closeDontDrainFirst(); + + // throw again to make the future failed + throw e; + } + } + + private ArrayList, R>> handleUserSuccess(Consumer callback, List> workContainerBatch, List resultsFromUserFunction) { + for (final WorkContainer kvWorkContainer : workContainerBatch) { + pc.onUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction); + } + + // capture each result, against the input record + var intermediateResults = new ArrayList, R>>(); + for (R result : resultsFromUserFunction) { + log.trace("Running users call back..."); + callback.accept(result); + } + + // fail or succeed, either way we're done + for (var kvWorkContainer : workContainerBatch) { + pc.addToMailBoxOnUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction); + } + log.trace("User function future registered"); + return intermediateResults; + } + + private List runUserFunction(Function, List> usersFunction, + PollContextInternal context) { + try { + return usersFunction.apply(context); + } catch (PCTerminalException e) { + var reaction = pc.getOptions().getTerminalFailureReaction(); + + if (reaction == SKIP) { + log.warn("Terminal error in user function, skipping record due to configuration in {} - triggering context: {}", + ParallelConsumerOptions.class.getSimpleName(), + context); + + // return empty result to cause system to skip as if it succeeded + // todo call on success to do all the extra stuff + return new ArrayList<>(); + } else if (reaction == SHUTDOWN) { + log.error("Shutting down upon terminal failure in user function due to {} {} setting in {} - triggering context: {}", + reaction, + ParallelConsumerOptions.TerminalFailureReaction.class.getSimpleName(), + ParallelConsumerOptions.class.getSimpleName(), + context); + + pc.closeDontDrainFirst(); + + // throw again to make the future failed + throw e; + } else { + throw new InternalRuntimeError(msg("Unsupported reaction config ({}) - submit a bug report.", reaction)); + } + } catch (Exception e) { + handleUserRetriableFailure(context, e); + + // throw again to make the future failed + throw e; + } + } + + private void handleUserRetriableFailure(PollContextInternal context, Exception e) { + logUserFunctionException(e); + markRecordsFailed(context.getWorkContainers(), e); + } + + private void markRecordsFailed(List> workContainerBatch, Exception e) { + for (var wc : workContainerBatch) { + wc.onUserFunctionFailure(e); + pc.addToMailbox(wc); // always add on error + } + } + + /** + * If user explicitly throws the {@link PCRetriableException}, then don't log it, as the user is already aware. + *

+ * Retriable or + * Retryable? Kafka uses Retriable, so we'll go with that ;) + */ + private void logUserFunctionException(Exception e) { + boolean explicitlyRetryable = e instanceof PCRetriableException; + var level = explicitlyRetryable ? DEBUG : WARN; + var prefix = explicitlyRetryable ? "Explicit " + PCRetriableException.class.getSimpleName() + " caught - " : ""; + var message = prefix + "Exception in user function, registering record as failed, returning to queue"; + log.atLevel(level).log(message, e); + } +} + diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/FunctionRunnerThingTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/FunctionRunnerThingTest.java new file mode 100644 index 000000000..4a43bf3a1 --- /dev/null +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/FunctionRunnerThingTest.java @@ -0,0 +1,74 @@ +package io.confluent.parallelconsumer.internal; + +import io.confluent.parallelconsumer.Offsets; +import io.confluent.parallelconsumer.PCRetriableException; +import io.confluent.parallelconsumer.PCTerminalException; +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.state.ModelUtils; +import io.confluent.parallelconsumer.state.WorkManager; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SHUTDOWN; +import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SKIP; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * @see FunctionRunnerThing + */ +class FunctionRunnerThingTest { + + @Test + void test() { + var mock = mock(AbstractParallelEoSStreamProcessor.class); + when(mock.getWm()).thenReturn(mock(WorkManager.class)); + when(mock.getOptions()).thenReturn(ParallelConsumerOptions.builder() + .terminalFailureReaction(SHUTDOWN) + .build()); + + FunctionRunnerThing r = new FunctionRunnerThing<>(mock); + var workFor = ModelUtils.createWorkFor(0); + r.runUserFunction(context -> { + throw new PCTerminalException("fake"); + }, + o -> { + }, List.of(workFor)); + } + + @Test + void testTwo() { + var mock = mock(AbstractParallelEoSStreamProcessor.class); + when(mock.getWm()).thenReturn(mock(WorkManager.class)); + when(mock.getOptions()).thenReturn(ParallelConsumerOptions.builder() + .terminalFailureReaction(SKIP) + .build()); + + FunctionRunnerThing r = new FunctionRunnerThing<>(mock); + var workFor = ModelUtils.createWorkFor(0); + r.runUserFunction(context -> { + throw new PCTerminalException("fake"); + }, + o -> { + }, List.of(workFor)); + } + + @Test + void testThree() { + var mock = mock(AbstractParallelEoSStreamProcessor.class); + when(mock.getWm()).thenReturn(mock(WorkManager.class)); + when(mock.getOptions()).thenReturn(ParallelConsumerOptions.builder() + .terminalFailureReaction(SKIP) + .build()); + + FunctionRunnerThing r = new FunctionRunnerThing<>(mock); + var workFor = ModelUtils.createWorkFor(0); + r.runUserFunction(context -> { + throw new PCRetriableException("fake", Offsets.ofLongs(0, 1)); + }, + o -> { + }, List.of(workFor)); + } + +} From 7d56bad3f58429599825d1444076375d83960569 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Sat, 14 May 2022 08:30:18 +0100 Subject: [PATCH 15/19] test and batch drafting --- .../PCRetriableException.java | 2 + .../internal/FunctionRunnerThing.java | 121 ++++++++++++------ .../internal/FunctionRunnerThingTest.java | 51 +++----- 3 files changed, 98 insertions(+), 76 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java index f17cd190c..d1d027070 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java @@ -5,6 +5,7 @@ */ import lombok.Getter; +import lombok.ToString; import java.util.List; import java.util.Optional; @@ -19,6 +20,7 @@ * So in short, if this exception is thrown, nothing will be logged (except at DEBUG level), any other exception will be * logged as an error. */ +@ToString public class PCRetriableException extends PCUserException { @Getter diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/FunctionRunnerThing.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/FunctionRunnerThing.java index e89044ce5..18c66bc63 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/FunctionRunnerThing.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/FunctionRunnerThing.java @@ -1,14 +1,17 @@ package io.confluent.parallelconsumer.internal; import io.confluent.parallelconsumer.*; +import io.confluent.parallelconsumer.ParallelConsumer.Tuple; import io.confluent.parallelconsumer.state.WorkContainer; -import lombok.Value; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.MDC; +import pl.tlinkowski.unij.api.UniLists; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; @@ -18,20 +21,18 @@ import static org.slf4j.event.Level.DEBUG; import static org.slf4j.event.Level.WARN; -@Value - +@AllArgsConstructor @Slf4j public class FunctionRunnerThing { - AbstractParallelEoSStreamProcessor pc; + private AbstractParallelEoSStreamProcessor pc; /** * Run the supplied function. */ - // todo extract class from this point - protected List, R>> runUserFunction(Function, List> usersFunction, - Consumer callback, - List> workContainerBatch) { + protected List, R>> runUserFunction(Function, List> usersFunction, + Consumer callback, + List> workContainerBatch) { // catch and process any internal error try { if (log.isDebugEnabled()) { @@ -50,9 +51,7 @@ protected List, R>> runUserFunct PollContextInternal context = new PollContextInternal<>(workContainerBatch); - List resultsFromUserFunction = runUserFunction(usersFunction, context); - - return handleUserSuccess(callback, workContainerBatch, resultsFromUserFunction); + return runWithUserExceptions(usersFunction, context, callback); } catch (PCUserException e) { // throw again to make the future failed throw e; @@ -66,13 +65,15 @@ protected List, R>> runUserFunct } } - private ArrayList, R>> handleUserSuccess(Consumer callback, List> workContainerBatch, List resultsFromUserFunction) { + private ArrayList, R>> handleUserSuccess(Consumer callback, + List> workContainerBatch, + List resultsFromUserFunction) { for (final WorkContainer kvWorkContainer : workContainerBatch) { pc.onUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction); } // capture each result, against the input record - var intermediateResults = new ArrayList, R>>(); + var intermediateResults = new ArrayList, R>>(); for (R result : resultsFromUserFunction) { log.trace("Running users call back..."); callback.accept(result); @@ -82,39 +83,25 @@ private ArrayList, R>> handleUse for (var kvWorkContainer : workContainerBatch) { pc.addToMailBoxOnUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction); } + log.trace("User function future registered"); return intermediateResults; } - private List runUserFunction(Function, List> usersFunction, - PollContextInternal context) { + private List, R>> runWithUserExceptions( + Function, ? extends List> usersFunction, + PollContextInternal context, + Consumer callback) { try { - return usersFunction.apply(context); + var resultsFromUserFunction = usersFunction.apply(context); + return handleUserSuccess(callback, context.getWorkContainers(), resultsFromUserFunction); } catch (PCTerminalException e) { - var reaction = pc.getOptions().getTerminalFailureReaction(); - - if (reaction == SKIP) { - log.warn("Terminal error in user function, skipping record due to configuration in {} - triggering context: {}", - ParallelConsumerOptions.class.getSimpleName(), - context); - - // return empty result to cause system to skip as if it succeeded - // todo call on success to do all the extra stuff - return new ArrayList<>(); - } else if (reaction == SHUTDOWN) { - log.error("Shutting down upon terminal failure in user function due to {} {} setting in {} - triggering context: {}", - reaction, - ParallelConsumerOptions.TerminalFailureReaction.class.getSimpleName(), - ParallelConsumerOptions.class.getSimpleName(), - context); - - pc.closeDontDrainFirst(); - - // throw again to make the future failed - throw e; - } else { - throw new InternalRuntimeError(msg("Unsupported reaction config ({}) - submit a bug report.", reaction)); - } + return handleTerminalFailure(context, e, callback); + } catch (PCRetriableException e) { + handleExplicitUserRetriableFailure(context, e); + + // throw again to make the future failed + throw e; } catch (Exception e) { handleUserRetriableFailure(context, e); @@ -123,6 +110,48 @@ private List runUserFunction(Function, List> } } + private List, R>> handleTerminalFailure(PollContextInternal context, + PCTerminalException e, Consumer callback) { + var reaction = pc.getOptions().getTerminalFailureReaction(); + + if (reaction == SKIP) { + log.warn("Terminal error in user function, skipping record due to configuration in {} - triggering context: {}", + ParallelConsumerOptions.class.getSimpleName(), + context); + + // return empty result to cause system to skip as if it succeeded + return handleUserSuccess(callback, context.getWorkContainers(), UniLists.of()); + } else if (reaction == SHUTDOWN) { + log.error("Shutting down upon terminal failure in user function due to {} {} setting in {} - triggering context: {}", + reaction, + ParallelConsumerOptions.TerminalFailureReaction.class.getSimpleName(), + ParallelConsumerOptions.class.getSimpleName(), + context); + + pc.closeDontDrainFirst(); + + // throw again to make the future failed + throw e; + } else { + throw new InternalRuntimeError(msg("Unsupported reaction config ({}) - submit a bug report.", reaction)); + } + } + + private void handleExplicitUserRetriableFailure(PollContextInternal context, PCRetriableException e) { + logUserFunctionException(e); + + Optional offsetsOptional = e.getOffsetsOptional(); + if (offsetsOptional.isPresent()) { + Offsets offsets = offsetsOptional.get(); + log.debug("Specific offsets present in {}", e.toString()); + context.streamInternal() + .filter(offsets::contains) + .forEach(work -> markRecordFailed(e, work)); + } else { + markRecordsFailed(context.getWorkContainers(), e); + } + } + private void handleUserRetriableFailure(PollContextInternal context, Exception e) { logUserFunctionException(e); markRecordsFailed(context.getWorkContainers(), e); @@ -130,11 +159,19 @@ private void handleUserRetriableFailure(PollContextInternal context, Excep private void markRecordsFailed(List> workContainerBatch, Exception e) { for (var wc : workContainerBatch) { - wc.onUserFunctionFailure(e); - pc.addToMailbox(wc); // always add on error + markRecordFailed(e, wc); } } + private void markRecordFailed(Exception e, RecordContextInternal wc) { + markRecordFailed(e, wc.getWorkContainer()); + } + + private void markRecordFailed(Exception e, WorkContainer wc) { + wc.onUserFunctionFailure(e); + pc.addToMailbox(wc); // always add on error + } + /** * If user explicitly throws the {@link PCRetriableException}, then don't log it, as the user is already aware. *

diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/FunctionRunnerThingTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/FunctionRunnerThingTest.java index 4a43bf3a1..023c7660e 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/FunctionRunnerThingTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/FunctionRunnerThingTest.java @@ -1,14 +1,13 @@ package io.confluent.parallelconsumer.internal; -import io.confluent.parallelconsumer.Offsets; -import io.confluent.parallelconsumer.PCRetriableException; -import io.confluent.parallelconsumer.PCTerminalException; -import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.*; +import io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction; import io.confluent.parallelconsumer.state.ModelUtils; import io.confluent.parallelconsumer.state.WorkManager; import org.junit.jupiter.api.Test; import java.util.List; +import java.util.function.Function; import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SHUTDOWN; import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SKIP; @@ -22,53 +21,37 @@ class FunctionRunnerThingTest { @Test void test() { + run(SHUTDOWN, context -> { + throw new PCTerminalException("fake"); + }); + } + + private void run(TerminalFailureReaction shutdown, Function, List> fake) { var mock = mock(AbstractParallelEoSStreamProcessor.class); when(mock.getWm()).thenReturn(mock(WorkManager.class)); when(mock.getOptions()).thenReturn(ParallelConsumerOptions.builder() - .terminalFailureReaction(SHUTDOWN) + .terminalFailureReaction(shutdown) .build()); FunctionRunnerThing r = new FunctionRunnerThing<>(mock); var workFor = ModelUtils.createWorkFor(0); - r.runUserFunction(context -> { - throw new PCTerminalException("fake"); - }, + r.runUserFunction(fake, o -> { }, List.of(workFor)); } @Test void testTwo() { - var mock = mock(AbstractParallelEoSStreamProcessor.class); - when(mock.getWm()).thenReturn(mock(WorkManager.class)); - when(mock.getOptions()).thenReturn(ParallelConsumerOptions.builder() - .terminalFailureReaction(SKIP) - .build()); - - FunctionRunnerThing r = new FunctionRunnerThing<>(mock); - var workFor = ModelUtils.createWorkFor(0); - r.runUserFunction(context -> { - throw new PCTerminalException("fake"); - }, - o -> { - }, List.of(workFor)); + run(SKIP, context -> { + throw new PCTerminalException("fake"); + }); } @Test void testThree() { - var mock = mock(AbstractParallelEoSStreamProcessor.class); - when(mock.getWm()).thenReturn(mock(WorkManager.class)); - when(mock.getOptions()).thenReturn(ParallelConsumerOptions.builder() - .terminalFailureReaction(SKIP) - .build()); - - FunctionRunnerThing r = new FunctionRunnerThing<>(mock); - var workFor = ModelUtils.createWorkFor(0); - r.runUserFunction(context -> { - throw new PCRetriableException("fake", Offsets.ofLongs(0, 1)); - }, - o -> { - }, List.of(workFor)); + run(SKIP, context -> { + throw new PCRetriableException("fake", Offsets.ofLongs(0, 1)); + }); } } From eb426451ada5dce4d3ae8ee1331aa098f8cd9922 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Sat, 14 May 2022 08:57:18 +0100 Subject: [PATCH 16/19] test and batch drafting --- .../confluent/parallelconsumer/Offsets.java | 10 ++- .../AbstractParallelEoSStreamProcessor.java | 4 +- ...nnerThing.java => UserFunctionRunner.java} | 90 ++++++++++--------- ...gTest.java => UserFunctionRunnerTest.java} | 14 +-- .../src/test/resources/logback-test.xml | 4 +- pom.xml | 2 +- 6 files changed, 64 insertions(+), 60 deletions(-) rename parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/{FunctionRunnerThing.java => UserFunctionRunner.java} (82%) rename parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/{FunctionRunnerThingTest.java => UserFunctionRunnerTest.java} (84%) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java index 87072c2ac..61cd2168d 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java @@ -1,11 +1,13 @@ package io.confluent.parallelconsumer; +import lombok.ToString; import lombok.experimental.Delegate; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; +@ToString public class Offsets { @Delegate @@ -15,18 +17,18 @@ public Offsets(List records) { this.rawOffsets = records; } - public static Offsets of(List> records) { - return ofLongs(records.stream() + public static Offsets from(List> records) { + return of(records.stream() .map(RecordContext::offset) .collect(Collectors.toUnmodifiableList())); } // due to type erasure, can't use method overloading - public static Offsets ofLongs(List rawOffsetsIn) { + public static Offsets of(List rawOffsetsIn) { return new Offsets(rawOffsetsIn); } - public static Offsets ofLongs(long... rawOffsetsIn) { + public static Offsets of(long... rawOffsetsIn) { return new Offsets(Arrays.stream(rawOffsetsIn).boxed().collect(Collectors.toList())); } } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index 4e0a9da61..847045e6e 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -766,8 +766,8 @@ private void submitWorkToPoolInner(final Function, log.trace("Sending work ({}) to pool", batch); Future outputRecordFuture = workerThreadPool.submit(() -> { addInstanceMDC(); - FunctionRunnerThing objectObjectFunctionRunnerThing = new FunctionRunnerThing<>(this); - return objectObjectFunctionRunnerThing.runUserFunction(usersFunction, callback, batch); + UserFunctionRunner runner = new UserFunctionRunner<>(this); + return runner.runUserFunction(usersFunction, callback, batch); }); // for a batch, each message in the batch shares the same result for (final WorkContainer workContainer : batch) { diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/FunctionRunnerThing.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java similarity index 82% rename from parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/FunctionRunnerThing.java rename to parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java index 18c66bc63..bb4f47fa1 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/FunctionRunnerThing.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java @@ -18,12 +18,10 @@ import static io.confluent.csid.utils.StringUtils.msg; import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SHUTDOWN; import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SKIP; -import static org.slf4j.event.Level.DEBUG; -import static org.slf4j.event.Level.WARN; @AllArgsConstructor @Slf4j -public class FunctionRunnerThing { +public class UserFunctionRunner { private AbstractParallelEoSStreamProcessor pc; @@ -46,7 +44,7 @@ protected List, R>> runUserFunction(Function context = new PollContextInternal<>(workContainerBatch); @@ -65,29 +63,6 @@ protected List, R>> runUserFunction(Function ArrayList, R>> handleUserSuccess(Consumer callback, - List> workContainerBatch, - List resultsFromUserFunction) { - for (final WorkContainer kvWorkContainer : workContainerBatch) { - pc.onUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction); - } - - // capture each result, against the input record - var intermediateResults = new ArrayList, R>>(); - for (R result : resultsFromUserFunction) { - log.trace("Running users call back..."); - callback.accept(result); - } - - // fail or succeed, either way we're done - for (var kvWorkContainer : workContainerBatch) { - pc.addToMailBoxOnUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction); - } - - log.trace("User function future registered"); - return intermediateResults; - } - private List, R>> runWithUserExceptions( Function, ? extends List> usersFunction, PollContextInternal context, @@ -96,22 +71,45 @@ private List, R>> runWithUserExceptions( var resultsFromUserFunction = usersFunction.apply(context); return handleUserSuccess(callback, context.getWorkContainers(), resultsFromUserFunction); } catch (PCTerminalException e) { - return handleTerminalFailure(context, e, callback); + return handleUserTerminalFailure(context, e, callback); } catch (PCRetriableException e) { handleExplicitUserRetriableFailure(context, e); // throw again to make the future failed throw e; } catch (Exception e) { - handleUserRetriableFailure(context, e); + handleImplicitUserRetriableFailure(context, e); // throw again to make the future failed throw e; } } - private List, R>> handleTerminalFailure(PollContextInternal context, - PCTerminalException e, Consumer callback) { + private List, R>> handleUserSuccess(Consumer callback, + List> workContainerBatch, + List resultsFromUserFunction) { + for (final WorkContainer kvWorkContainer : workContainerBatch) { + pc.onUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction); + } + + // capture each result, against the input record + var intermediateResults = new ArrayList, R>>(); + for (R result : resultsFromUserFunction) { + log.trace("Running users call back..."); + callback.accept(result); + } + + // fail or succeed, either way we're done + for (var kvWorkContainer : workContainerBatch) { + pc.addToMailBoxOnUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction); + } + + log.trace("User function future registered"); + return intermediateResults; + } + + private List, R>> handleUserTerminalFailure(PollContextInternal context, + PCTerminalException e, Consumer callback) { var reaction = pc.getOptions().getTerminalFailureReaction(); if (reaction == SKIP) { @@ -143,16 +141,22 @@ private void handleExplicitUserRetriableFailure(PollContextInternal contex Optional offsetsOptional = e.getOffsetsOptional(); if (offsetsOptional.isPresent()) { Offsets offsets = offsetsOptional.get(); - log.debug("Specific offsets present in {}", e.toString()); + log.debug("Specific offsets present in {}", offsets); context.streamInternal() - .filter(offsets::contains) - .forEach(work -> markRecordFailed(e, work)); + .forEach(work -> { + if (offsets.contains(work)) { + markRecordFailed(e, work.getWorkContainer()); + } else { + // mark record succeeded + handleUserSuccess(); + } + }); } else { markRecordsFailed(context.getWorkContainers(), e); } } - private void handleUserRetriableFailure(PollContextInternal context, Exception e) { + private void handleImplicitUserRetriableFailure(PollContextInternal context, Exception e) { logUserFunctionException(e); markRecordsFailed(context.getWorkContainers(), e); } @@ -163,10 +167,6 @@ private void markRecordsFailed(List> workContainerBatch, Exc } } - private void markRecordFailed(Exception e, RecordContextInternal wc) { - markRecordFailed(e, wc.getWorkContainer()); - } - private void markRecordFailed(Exception e, WorkContainer wc) { wc.onUserFunctionFailure(e); pc.addToMailbox(wc); // always add on error @@ -179,11 +179,13 @@ private void markRecordFailed(Exception e, WorkContainer wc) { * Retryable? Kafka uses Retriable, so we'll go with that ;) */ private void logUserFunctionException(Exception e) { - boolean explicitlyRetryable = e instanceof PCRetriableException; - var level = explicitlyRetryable ? DEBUG : WARN; - var prefix = explicitlyRetryable ? "Explicit " + PCRetriableException.class.getSimpleName() + " caught - " : ""; - var message = prefix + "Exception in user function, registering record as failed, returning to queue"; - log.atLevel(level).log(message, e); + var message = "in user function, registering record as failed, returning to queue"; + if (e instanceof PCRetriableException) { + log.debug("Explicit exception {} caught - {}", message, e.toString()); + } else { + log.warn("Exception {}", message, e); + } } + } diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/FunctionRunnerThingTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java similarity index 84% rename from parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/FunctionRunnerThingTest.java rename to parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java index 023c7660e..894bd6040 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/FunctionRunnerThingTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java @@ -15,12 +15,12 @@ import static org.mockito.Mockito.when; /** - * @see FunctionRunnerThing + * @see UserFunctionRunner */ -class FunctionRunnerThingTest { +class UserFunctionRunnerTest { @Test - void test() { + void shutdown() { run(SHUTDOWN, context -> { throw new PCTerminalException("fake"); }); @@ -33,7 +33,7 @@ private void run(TerminalFailureReaction shutdown, Function r = new FunctionRunnerThing<>(mock); + UserFunctionRunner r = new UserFunctionRunner<>(mock); var workFor = ModelUtils.createWorkFor(0); r.runUserFunction(fake, o -> { @@ -41,16 +41,16 @@ private void run(TerminalFailureReaction shutdown, Function { throw new PCTerminalException("fake"); }); } @Test - void testThree() { + void offsets() { run(SKIP, context -> { - throw new PCRetriableException("fake", Offsets.ofLongs(0, 1)); + throw new PCRetriableException("fake", Offsets.of(0, 1)); }); } diff --git a/parallel-consumer-core/src/test/resources/logback-test.xml b/parallel-consumer-core/src/test/resources/logback-test.xml index e7d4e3b4f..cafd10476 100644 --- a/parallel-consumer-core/src/test/resources/logback-test.xml +++ b/parallel-consumer-core/src/test/resources/logback-test.xml @@ -27,8 +27,8 @@ - - + + diff --git a/pom.xml b/pom.xml index 044d68b4d..629a62737 100644 --- a/pom.xml +++ b/pom.xml @@ -99,7 +99,7 @@ 3.0.0-M6 - 2.0.0-alpha7 + 1.7.36 3.1.0 0.1.3 From 8a8a780d4ef1b94f88b6783cfb392266ef6f68e8 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Sat, 14 May 2022 09:19:53 +0100 Subject: [PATCH 17/19] test and batch drafting --- .../src/main/java/io/confluent/parallelconsumer/Offsets.java | 4 ++++ .../io/confluent/parallelconsumer/PCRetriableException.java | 2 +- .../java/io/confluent/parallelconsumer/PCUserException.java | 4 ++++ .../parallelconsumer/internal/UserFunctionRunner.java | 4 ++++ .../parallelconsumer/internal/UserFunctionRunnerTest.java | 4 ++++ 5 files changed, 17 insertions(+), 1 deletion(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java index 61cd2168d..4207ce1cd 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java @@ -1,5 +1,9 @@ package io.confluent.parallelconsumer; +/*- + * Copyright (C) 2020-2022 Confluent, Inc. + */ + import lombok.ToString; import lombok.experimental.Delegate; diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java index d1d027070..c23155593 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java @@ -45,7 +45,7 @@ public PCRetriableException(String message, Offsets offsets) { */ public PCRetriableException(String message, List offsets) { super(message); - offsetsOptional = Optional.empty(); + offsetsOptional = Optional.of(Offsets.of(offsets)); } public PCRetriableException(String message) { diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCUserException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCUserException.java index 5edf398c3..58fcf0484 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCUserException.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCUserException.java @@ -1,5 +1,9 @@ package io.confluent.parallelconsumer; +/*- + * Copyright (C) 2020-2022 Confluent, Inc. + */ + /** * todo */ diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java index bb4f47fa1..f9b1c52f9 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java @@ -1,5 +1,9 @@ package io.confluent.parallelconsumer.internal; +/*- + * Copyright (C) 2020-2022 Confluent, Inc. + */ + import io.confluent.parallelconsumer.*; import io.confluent.parallelconsumer.ParallelConsumer.Tuple; import io.confluent.parallelconsumer.state.WorkContainer; diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java index 894bd6040..b644ff04a 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java @@ -1,5 +1,9 @@ package io.confluent.parallelconsumer.internal; +/*- + * Copyright (C) 2020-2022 Confluent, Inc. + */ + import io.confluent.parallelconsumer.*; import io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction; import io.confluent.parallelconsumer.state.ModelUtils; From 93324d0403a90cab1f38d6153c3c2972ce4ecfa2 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 20 May 2022 11:43:46 +0100 Subject: [PATCH 18/19] revert partial batch failure --- .../PCRetriableException.java | 29 ------------------- .../internal/UserFunctionRunner.java | 19 +----------- .../internal/UserFunctionRunnerTest.java | 11 ++----- 3 files changed, 4 insertions(+), 55 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java index c23155593..a23619e40 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java @@ -4,12 +4,8 @@ * Copyright (C) 2020-2022 Confluent, Inc. */ -import lombok.Getter; import lombok.ToString; -import java.util.List; -import java.util.Optional; - /** * A user's processing function can throw this exception, which signals to PC that processing of the message has failed, * and that it should be retired at a later time. @@ -23,31 +19,6 @@ @ToString public class PCRetriableException extends PCUserException { - @Getter - private Optional offsetsOptional = Optional.empty(); - - /** - * todo - * - * @param message - * @param offsets - */ - public PCRetriableException(String message, Offsets offsets) { - super(message); - this.offsetsOptional = Optional.of(offsets); - } - - /** - * todo - * - * @param message - * @param offsets - */ - public PCRetriableException(String message, List offsets) { - super(message); - offsetsOptional = Optional.of(Offsets.of(offsets)); - } - public PCRetriableException(String message) { super(message); } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java index 223240484..4964b52f4 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java @@ -15,7 +15,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; @@ -142,23 +141,7 @@ private List, R>> handleUserTerminalFailure(PollC private void handleExplicitUserRetriableFailure(PollContextInternal context, PCRetriableException e) { logUserFunctionException(e); - - Optional offsetsOptional = e.getOffsetsOptional(); - if (offsetsOptional.isPresent()) { - Offsets offsets = offsetsOptional.get(); - log.debug("Specific offsets present in {}", offsets); - context.streamInternal() - .forEach(work -> { - if (offsets.contains(work)) { - markRecordFailed(e, work.getWorkContainer()); - } else { - // mark record succeeded - handleUserSuccess(); - } - }); - } else { - markRecordsFailed(context.getWorkContainers(), e); - } + markRecordsFailed(context.getWorkContainers(), e); } private void handleImplicitUserRetriableFailure(PollContextInternal context, Exception e) { diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java index b644ff04a..84d928b27 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java @@ -4,8 +4,10 @@ * Copyright (C) 2020-2022 Confluent, Inc. */ -import io.confluent.parallelconsumer.*; +import io.confluent.parallelconsumer.PCTerminalException; +import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction; +import io.confluent.parallelconsumer.PollContextInternal; import io.confluent.parallelconsumer.state.ModelUtils; import io.confluent.parallelconsumer.state.WorkManager; import org.junit.jupiter.api.Test; @@ -51,11 +53,4 @@ void skip() { }); } - @Test - void offsets() { - run(SKIP, context -> { - throw new PCRetriableException("fake", Offsets.of(0, 1)); - }); - } - } From 14aad84efb6fb95da913d8f226cb1c36b59fba60 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 20 May 2022 15:59:00 +0100 Subject: [PATCH 19/19] review --- .../confluent/parallelconsumer/Offsets.java | 38 ------------------- .../internal/UserFunctionRunner.java | 5 ++- 2 files changed, 4 insertions(+), 39 deletions(-) delete mode 100644 parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java deleted file mode 100644 index 4207ce1cd..000000000 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java +++ /dev/null @@ -1,38 +0,0 @@ -package io.confluent.parallelconsumer; - -/*- - * Copyright (C) 2020-2022 Confluent, Inc. - */ - -import lombok.ToString; -import lombok.experimental.Delegate; - -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; - -@ToString -public class Offsets { - - @Delegate - private final List rawOffsets; - - public Offsets(List records) { - this.rawOffsets = records; - } - - public static Offsets from(List> records) { - return of(records.stream() - .map(RecordContext::offset) - .collect(Collectors.toUnmodifiableList())); - } - - // due to type erasure, can't use method overloading - public static Offsets of(List rawOffsetsIn) { - return new Offsets(rawOffsetsIn); - } - - public static Offsets of(long... rawOffsetsIn) { - return new Offsets(Arrays.stream(rawOffsetsIn).boxed().collect(Collectors.toList())); - } -} diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java index 4964b52f4..6e7ccf821 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java @@ -23,11 +23,14 @@ import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SKIP; import static io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.MDC_WORK_CONTAINER_DESCRIPTOR; +/** + * Manages the result of running the user's function, particularly error handling. + */ @AllArgsConstructor @Slf4j public class UserFunctionRunner { - private AbstractParallelEoSStreamProcessor pc; + private final AbstractParallelEoSStreamProcessor pc; /** * Run the supplied function.