From cc379ecd55a8bd134aac960f652f56cdccd78af7 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 13 May 2022 12:35:52 +0100 Subject: [PATCH] Experiment: extend Consumer and Function - centralises documentation and makes usage more explicit rename step step --- .../ParallelEoSStreamProcessor.java | 2 +- .../ParallelStreamProcessor.java | 8 ++-- .../parallelconsumer/RecordProcessor.java | 47 +++++++++++++++++++ .../AbstractParallelEoSStreamProcessor.java | 4 ++ .../internal/UserFunctions.java | 4 +- .../examples/core/CoreApp.java | 3 ++ 6 files changed, 61 insertions(+), 7 deletions(-) create mode 100644 parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RecordProcessor.java diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java index e9486aab7..1a583b9e4 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java @@ -33,7 +33,7 @@ public ParallelEoSStreamProcessor(final ParallelConsumerOptions newOptions } @Override - public void poll(Consumer> usersVoidConsumptionFunction) { + public void poll(RecordProcessor.PollConsumer usersVoidConsumptionFunction) { Function, List> wrappedUserFunc = (context) -> { log.trace("asyncPoll - Consumed a consumerRecord ({}), executing void function...", context); diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelStreamProcessor.java index e4568145f..75a58dc1c 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelStreamProcessor.java @@ -4,6 +4,7 @@ * Copyright (C) 2020-2022 Confluent, Inc. */ +import io.confluent.parallelconsumer.RecordProcessor.PollConsumerAndProducer; import io.confluent.parallelconsumer.internal.DrainingCloseable; import lombok.Data; import org.apache.kafka.clients.producer.ProducerRecord; @@ -27,12 +28,11 @@ static ParallelStreamProcessor createEosStreamProcessor(Paralle } /** - * Register a function to be applied in parallel to each received message + * Register a function to be applied in parallel to each received message. * * @param usersVoidConsumptionFunction the function */ - void poll(Consumer> usersVoidConsumptionFunction); - + void poll(RecordProcessor.PollConsumer usersVoidConsumptionFunction); /** * Register a function to be applied in parallel to each received message, which in turn returns one or more {@link @@ -40,7 +40,7 @@ static ParallelStreamProcessor createEosStreamProcessor(Paralle * * @param callback applied after the produced message is acknowledged by kafka */ - void pollAndProduceMany(Function, List>> userFunction, + void pollAndProduceMany(PollConsumerAndProducer userFunction, Consumer> callback); /** diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RecordProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RecordProcessor.java new file mode 100644 index 000000000..1d07f9a9e --- /dev/null +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RecordProcessor.java @@ -0,0 +1,47 @@ +package io.confluent.parallelconsumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.util.List; + +/** + * Types of user functions used for processing records. + */ +public interface RecordProcessor { + + /** + * Process a Kafka {@link ConsumerRecord} via {@link PollContext} instances. + */ + @FunctionalInterface + interface PollConsumer extends java.util.function.Consumer> { + + /** + * Process a Kafka {@link ConsumerRecord} via {@link PollContext} instances. + *

+ * User can throw a {@link PCRetriableException}, if an issue is and PC should handle the process of retrying it + * later. If an exception is thrown that doesn't extend {@link PCRetriableException}, the error will be logged + * at {@code WARN} level. + * + * @param records the Kafka records to process + * @see PCRetriableException + * @see ParallelConsumerOptions#getRetryDelayProvider() + * @see ParallelConsumerOptions#getDefaultMessageRetryDelay() + */ + void accept(PollContext records); + } + + @FunctionalInterface + interface PollConsumerAndProducer extends java.util.function.Function, List>> { + + /** + * Like {@link PollConsumer#accept(PollContext)} but also returns records to be produced back to Kafka. + * + * @param records the Kafka records to process + * @return the function result + */ + @Override + List> apply(PollContext records); + + } +} diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index 47150846c..9be22fce4 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -6,6 +6,10 @@ import io.confluent.csid.utils.TimeUtils; import io.confluent.parallelconsumer.*; +import io.confluent.parallelconsumer.PCRetriableException; +import io.confluent.parallelconsumer.ParallelConsumer; +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.PollContextInternal; import io.confluent.parallelconsumer.state.WorkContainer; import io.confluent.parallelconsumer.state.WorkManager; import lombok.*; diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctions.java index 0e88eb541..6c1a22f23 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctions.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctions.java @@ -5,10 +5,10 @@ */ import io.confluent.parallelconsumer.ErrorInUserFunctionException; +import io.confluent.parallelconsumer.RecordProcessor; import lombok.experimental.UtilityClass; import java.util.function.BiFunction; -import java.util.function.Consumer; import java.util.function.Function; /** @@ -56,7 +56,7 @@ public static RESULT carefullyRun(Function wrappe * @param wrappedFunction the function to run * @param userFuncParam the parameter to pass into the user's function */ - public static void carefullyRun(Consumer wrappedFunction, PARAM userFuncParam) { + public static void carefullyRun(RecordProcessor.PollConsumer wrappedFunction, PARAM userFuncParam) { try { wrappedFunction.accept(userFuncParam); } catch (Exception e) { diff --git a/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java b/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java index 4506566a0..1b1ac4889 100644 --- a/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java +++ b/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java @@ -141,6 +141,9 @@ void maxRetries() { final int maxRetries = 10; final Map, Long> retriesCount = new ConcurrentHashMap<>(); + pc.pollAndProduceMany(); + pc.poll(records ->); + pc.poll(context -> { var consumerRecord = context.getSingleRecord().getConsumerRecord(); Long retryCount = retriesCount.computeIfAbsent(consumerRecord, ignore -> 0L);