Skip to content

Commit

Permalink
Experiment: extend Consumer and Function - centralises documentation …
Browse files Browse the repository at this point in the history
…and makes usage more explicit

rename

step

step
  • Loading branch information
astubbs committed May 13, 2022
1 parent a9ded2d commit cc379ec
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public ParallelEoSStreamProcessor(final ParallelConsumerOptions<K, V> newOptions
}

@Override
public void poll(Consumer<PollContext<K, V>> usersVoidConsumptionFunction) {
public void poll(RecordProcessor.PollConsumer<K, V> usersVoidConsumptionFunction) {
Function<PollContextInternal<K, V>, List<Object>> wrappedUserFunc = (context) -> {
log.trace("asyncPoll - Consumed a consumerRecord ({}), executing void function...", context);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.RecordProcessor.PollConsumerAndProducer;
import io.confluent.parallelconsumer.internal.DrainingCloseable;
import lombok.Data;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -27,20 +28,19 @@ static <KK, VV> ParallelStreamProcessor<KK, VV> createEosStreamProcessor(Paralle
}

/**
* Register a function to be applied in parallel to each received message
* Register a function to be applied in parallel to each received message.
*
* @param usersVoidConsumptionFunction the function
*/
void poll(Consumer<PollContext<K, V>> usersVoidConsumptionFunction);

void poll(RecordProcessor.PollConsumer<K, V> usersVoidConsumptionFunction);

/**
* Register a function to be applied in parallel to each received message, which in turn returns one or more {@link
* ProducerRecord}s to be sent back to the broker.
*
* @param callback applied after the produced message is acknowledged by kafka
*/
void pollAndProduceMany(Function<PollContext<K, V>, List<ProducerRecord<K, V>>> userFunction,
void pollAndProduceMany(PollConsumerAndProducer<K, V> userFunction,
Consumer<ConsumeProduceResult<K, V, K, V>> callback);

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.confluent.parallelconsumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.List;

/**
* Types of user functions used for processing records.
*/
public interface RecordProcessor {

/**
* Process a Kafka {@link ConsumerRecord} via {@link PollContext} instances.
*/
@FunctionalInterface
interface PollConsumer<K, V> extends java.util.function.Consumer<PollContext<K, V>> {

/**
* Process a Kafka {@link ConsumerRecord} via {@link PollContext} instances.
* <p>
* User can throw a {@link PCRetriableException}, if an issue is and PC should handle the process of retrying it
* later. If an exception is thrown that doesn't extend {@link PCRetriableException}, the error will be logged
* at {@code WARN} level.
*
* @param records the Kafka records to process
* @see PCRetriableException
* @see ParallelConsumerOptions#getRetryDelayProvider()
* @see ParallelConsumerOptions#getDefaultMessageRetryDelay()
*/
void accept(PollContext records);
}

@FunctionalInterface
interface PollConsumerAndProducer<K, V> extends java.util.function.Function<PollContext<K, V>, List<ProducerRecord<K, V>>> {

/**
* Like {@link PollConsumer#accept(PollContext)} but also returns records to be produced back to Kafka.
*
* @param records the Kafka records to process
* @return the function result
*/
@Override
List<ProducerRecord<K, V>> apply(PollContext records);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

import io.confluent.csid.utils.TimeUtils;
import io.confluent.parallelconsumer.*;
import io.confluent.parallelconsumer.PCRetriableException;
import io.confluent.parallelconsumer.ParallelConsumer;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.PollContextInternal;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import lombok.*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
*/

import io.confluent.parallelconsumer.ErrorInUserFunctionException;
import io.confluent.parallelconsumer.RecordProcessor;
import lombok.experimental.UtilityClass;

import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -56,7 +56,7 @@ public static <PARAM, RESULT> RESULT carefullyRun(Function<PARAM, RESULT> wrappe
* @param wrappedFunction the function to run
* @param userFuncParam the parameter to pass into the user's function
*/
public static <PARAM> void carefullyRun(Consumer<PARAM> wrappedFunction, PARAM userFuncParam) {
public static <PARAM> void carefullyRun(RecordProcessor.PollConsumer<PARAM> wrappedFunction, PARAM userFuncParam) {
try {
wrappedFunction.accept(userFuncParam);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ void maxRetries() {
final int maxRetries = 10;
final Map<ConsumerRecord<String, String>, Long> retriesCount = new ConcurrentHashMap<>();

pc.pollAndProduceMany();
pc.poll(records ->);

pc.poll(context -> {
var consumerRecord = context.getSingleRecord().getConsumerRecord();
Long retryCount = retriesCount.computeIfAbsent(consumerRecord, ignore -> 0L);
Expand Down

0 comments on commit cc379ec

Please sign in to comment.