Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

major: Extend Consumer and Function for more cohesive API #303

Closed
wants to merge 54 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
21f08f6
START: the basics of a single queue
astubbs Mar 8, 2022
3f1f3ff
step: remove work mailbox manager
astubbs Mar 8, 2022
92b50cb
step: BROKEN: assign epoch to record immediately
astubbs Mar 8, 2022
1653bc2
step - trying to test perf
astubbs Mar 17, 2022
103c677
update
astubbs Mar 17, 2022
eed2190
logs
astubbs Mar 17, 2022
02ee3cd
fix: Debug output for sorted encoding pairs
astubbs Mar 17, 2022
83fda73
save
astubbs Mar 17, 2022
2a37d46
rebase update
astubbs Mar 25, 2022
b17c838
step
astubbs Apr 4, 2022
e1141e4
save
astubbs Apr 5, 2022
5a3bb55
save
astubbs Apr 5, 2022
6a1464c
save: unit test version of offset encoding backpressure test
astubbs Apr 5, 2022
0604934
save
astubbs Apr 5, 2022
4eeb008
omg - hashsets vs queues, wow
astubbs Apr 5, 2022
bcfc9c1
review
astubbs Apr 6, 2022
6054ac5
review
astubbs Apr 6, 2022
c968629
review
astubbs Apr 6, 2022
0f993dd
review
astubbs Apr 6, 2022
416fd2f
Merge remote-tracking branch 'confluent/master' into features/single-…
astubbs Apr 21, 2022
189dc59
step
astubbs Apr 21, 2022
908d8ed
step
astubbs Apr 21, 2022
7547ec6
fix test
astubbs Apr 21, 2022
939a15e
step - test fix?
astubbs Apr 21, 2022
3fa6ae3
step - test fix?
astubbs Apr 21, 2022
c44f50a
step - test fix? - make sure unit test also cleans up
astubbs Apr 21, 2022
eff0b13
step - test fix? - make sure unit test also cleans up
astubbs Apr 21, 2022
a6dc167
START: Explicit retry exception for cleaner logging
astubbs Apr 21, 2022
74e0efb
step: reduce consumer max poll
astubbs Apr 21, 2022
ae1ce22
step: loosen duplicate check a bit for jenkins
astubbs Apr 21, 2022
62ffa63
step: fix generics
astubbs Apr 21, 2022
bf4452e
step: Experiment: synchronisation no longer needed due to stronger ep…
astubbs Apr 21, 2022
94ebc5c
turn max poll back to default (500)
astubbs Apr 21, 2022
1e8fcd9
license
astubbs Apr 21, 2022
b85fd2d
review
astubbs Apr 22, 2022
c6056fe
review
astubbs Apr 22, 2022
333ccac
review
astubbs Apr 22, 2022
d751aa4
review
astubbs Apr 22, 2022
aa5c0e1
fix
astubbs Apr 22, 2022
3b51ffe
START: Rename PartitionMonitor to PartitionStateManager
astubbs Apr 22, 2022
1b087e5
Merge branch 'features/single-queue' into features/retry-exception
astubbs Apr 22, 2022
88e9f0e
START: Experiment: extend Consumer and Function - centralises documen…
astubbs May 13, 2022
6862a56
step
astubbs May 13, 2022
14f1d47
refactor #409: Clarify truncation code
astubbs Oct 19, 2022
24c61f1
refactor #409: Clarify truncation code - tests passing
astubbs Oct 19, 2022
62be60b
refactor #409: Clarify truncation code - tests passing
astubbs Oct 19, 2022
2122e9e
bump jabel for j19
astubbs Oct 19, 2022
0131d35
cleanup
astubbs Oct 19, 2022
158ca87
fix
astubbs Oct 19, 2022
a66b673
Merge remote-tracking branch 'origin/features/retry-exception' into e…
astubbs Oct 19, 2022
40ba4e3
merge fix
astubbs Oct 19, 2022
f97c3cb
migrate
astubbs Oct 19, 2022
8a996af
Merge branch 'features/retry-exception-simple' into features/extend-f…
astubbs Oct 20, 2022
f831137
step
astubbs Oct 20, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.confluent.parallelconsumer;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import lombok.experimental.StandardException;

/**
* A user's processing function can throw this exception, which signals to PC that processing of the message has failed,
* and that it should be retired at a later time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* and that it should be retired at a later time.
* and that it should be retried at a later time.

* <p>
* The advantage of throwing this exception explicitly, is that PC will not log an ERROR. If any other type of exception
* is thrown by the user's function, that will be logged as an error (but will still be retried later).
* <p>
* So in short, if this exception is thrown, nothing will be logged (except at DEBUG level), any other exception will be
* logged as an error.
*
* @author Antony Stubbs
*/
@StandardException
public class PCRetriableException extends RuntimeException {
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public ParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOptions) {
}

@Override
public void poll(Consumer<PollContext<K, V>> usersVoidConsumptionFunction) {
public void poll(UserFunctions.Processor<K, V> usersVoidConsumptionFunction) {
Function<PollContextInternal<K, V>, List<Object>> wrappedUserFunc = (context) -> {
log.trace("asyncPoll - Consumed a consumerRecord ({}), executing void function...", context);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.UserFunctions.Transformer;
import io.confluent.parallelconsumer.internal.DrainingCloseable;
import lombok.Data;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -27,21 +28,20 @@ static <KK, VV> ParallelStreamProcessor<KK, VV> createEosStreamProcessor(Paralle
}

/**
* Register a function to be applied in parallel to each received message
* Register a function to be applied in parallel to each received message.
*
* @param usersVoidConsumptionFunction the function
*/
// todo why isn't this in ParallelConsumer ?
void poll(Consumer<PollContext<K, V>> usersVoidConsumptionFunction);

void poll(UserFunctions.Processor<K, V> usersVoidConsumptionFunction);

/**
* Register a function to be applied in parallel to each received message, which in turn returns one or more {@link
* ProducerRecord}s to be sent back to the broker.
* Register a function to be applied in parallel to each received message, which in turn returns one or more
* {@link ProducerRecord}s to be sent back to the broker.
*
* @param callback applied after the produced message is acknowledged by kafka
*/
void pollAndProduceMany(Function<PollContext<K, V>, List<ProducerRecord<K, V>>> userFunction,
void pollAndProduceMany(Transformer<K, V> userFunction,
Consumer<ConsumeProduceResult<K, V, K, V>> callback);

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.confluent.parallelconsumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.List;

/**
* Types of user functions used for processing records.
*
* @author Antony Stubbs
*/
public interface UserFunctions {

/**
* Process a Kafka {@link ConsumerRecord} via {@link PollContext} instances.
*/
@FunctionalInterface
interface Processor<K, V> {// extends java.util.function.Consumer<PollContext<K, V>> {

/**
* Process a Kafka {@link ConsumerRecord} via {@link PollContext} instances.
* <p>
* User can throw a {@link PCRetriableException}, if an issue is and PC should handle the process of retrying it
* later. If an exception is thrown that doesn't extend {@link PCRetriableException}, the error will be logged
* at {@code WARN} level. Note that, by default, any exception thrown from a users function will cause the
* record to be retried, as if a {@link PCRetriableException} had actually been thrown.
*
* @param records the Kafka records to process
* @see PCRetriableException
* @see ParallelConsumerOptions#getRetryDelayProvider()
* @see ParallelConsumerOptions#getDefaultMessageRetryDelay()
*/
void process(PollContext<K, V> records);
}

@FunctionalInterface
interface Transformer<K, V> { //extends java.util.function.Function<PollContext<K, V>, List<ProducerRecord<K, V>>> {

/**
* Like {@link Processor#process(PollContext)} but also returns records to be produced back to Kafka.
*
* @param records the Kafka records to process
* @return the function result
* @see Processor#process(PollContext)
*/
List<ProducerRecord<K, V>> flatMap(PollContext<K, V> records);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
*/

import io.confluent.csid.utils.TimeUtils;
import io.confluent.parallelconsumer.ExceptionInUserFunctionException;
import io.confluent.parallelconsumer.ParallelConsumer;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.PollContextInternal;
import io.confluent.parallelconsumer.*;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import lombok.*;
Expand Down Expand Up @@ -1174,7 +1171,15 @@ protected <R> List<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> runUserFunct
return intermediateResults;
} catch (Exception e) {
// handle fail
log.error("Exception caught in user function running stage, registering WC as failed, returning to mailbox. Context: {}", context, e);
var cause = e.getCause();
String msg = msg("Exception caught in user function running stage, registering WC as failed, returning to" +
" mailbox. Context: {}", context, e);
if (cause instanceof PCRetriableException) {
log.debug("Explicit " + PCRetriableException.class.getSimpleName() + " caught, logging at DEBUG only. " + msg, e);
} else {
log.error(msg, e);
}

for (var wc : workContainerBatch) {
wc.onUserFunctionFailure(e);
addToMailbox(context, wc); // always add on error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
*/

import io.confluent.parallelconsumer.ExceptionInUserFunctionException;
import io.confluent.parallelconsumer.PollContext;
import io.confluent.parallelconsumer.UserFunctions.Processor;
import lombok.experimental.UtilityClass;

import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* Single entry point for wrapping the actual execution of user functions
*
* @author Antony Stubbs
*/
@UtilityClass
public class UserFunctions {
Expand Down Expand Up @@ -43,7 +46,7 @@ public static <PARAM_ONE, PARAM_TWO, RESULT> RESULT carefullyRun(BiFunction<PARA
* @param wrappedFunction the function to run
* @param userFuncParam the parameter to pass into the user's function
*/
public static <PARAM, RESULT> RESULT carefullyRun(Function<PARAM, RESULT> wrappedFunction, PARAM userFuncParam) {
public static <PARAM, RESULT> RESULT carefullyRun(Function<K, V> wrappedFunction, PollContext<K, V> userFuncParam) {
try {
return wrappedFunction.apply(userFuncParam);
} catch (Throwable e) {
Expand All @@ -56,9 +59,9 @@ public static <PARAM, RESULT> RESULT carefullyRun(Function<PARAM, RESULT> wrappe
* @param wrappedFunction the function to run
* @param userFuncParam the parameter to pass into the user's function
*/
public static <PARAM> void carefullyRun(Consumer<PARAM> wrappedFunction, PARAM userFuncParam) {
public static <K, V> void carefullyRun(Processor<K, V> wrappedFunction, PollContext<K, V> userFuncParam) {
try {
wrappedFunction.accept(userFuncParam);
wrappedFunction.process(userFuncParam);
} catch (Throwable e) {
throw new ExceptionInUserFunctionException(MSG, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,42 +301,47 @@ public void addNewIncompleteRecord(ConsumerRecord<K, V> record) {

/**
* If the offset is higher than expected, according to the previously committed / polled offset, truncate up to it.
* Offsets between have disappeared and will never be polled again.
* If lower, reset down to it.
* <p>
* Only runs if this is the first {@link WorkContainer} to be added since instantiation.
* Only runs if this is the first {@link ConsumerRecord} to be added since instantiation.
* <p>
* Can be caused by the offset reset policy of the underlying consumer.
*/
private void maybeTruncateBelowOrAbove(long polledOffset) {
private void maybeTruncateBelowOrAbove(long bootstrapPolledOffset) {
if (bootstrapPhase) {
bootstrapPhase = false;
} else {
// Not bootstrap phase anymore, so not checking for truncation
return;
}

long expectedBootstrapRecordOffset = getNextExpectedInitialPolledOffset();
// during bootstrap, getOffsetToCommit() will return the offset of the last record committed, so we can use that to determine if we need to truncate
long expectedBootstrapRecordOffset = getOffsetToCommit();

boolean pollAboveExpected = polledOffset > expectedBootstrapRecordOffset;
boolean pollAboveExpected = bootstrapPolledOffset > expectedBootstrapRecordOffset;

boolean pollBelowExpected = polledOffset < expectedBootstrapRecordOffset;
boolean pollBelowExpected = bootstrapPolledOffset < expectedBootstrapRecordOffset;

if (pollAboveExpected) {
// previously committed offset record has been removed, or manual reset to higher offset detected
log.warn("Truncating state - removing records lower than {}. Offsets have been removed from the partition by the broker or committed offset has been raised. Bootstrap polled {} but " +
"expected {} from loaded commit data. Could be caused by record retention or compaction.",
polledOffset,
polledOffset,
// previously committed offset record has been removed from the topic, so we need to truncate up to it
log.warn("Truncating state - removing records lower than {}. Offsets have been removed from the partition " +
"by the broker or committed offset has been raised. Bootstrap polled {} but expected {} from loaded commit data. " +
"Could be caused by record retention or compaction and offset reset policy LATEST.",
bootstrapPolledOffset,
bootstrapPolledOffset,
expectedBootstrapRecordOffset);

// truncate
final NavigableSet<Long> incompletesToPrune = incompleteOffsets.keySet().headSet(polledOffset, false);
final NavigableSet<Long> incompletesToPrune = incompleteOffsets.keySet().headSet(bootstrapPolledOffset, false);
incompletesToPrune.forEach(incompleteOffsets::remove);
} else if (pollBelowExpected) {
// manual reset to lower offset detected
// reset to lower offset detected, so we need to reset our state to match
log.warn("Bootstrap polled offset has been reset to an earlier offset ({}) - truncating state - all records " +
"above (including this) will be replayed. Was expecting {} but bootstrap poll was {}.",
polledOffset,
"above (including this) will be replayed. Was expecting {} but bootstrap poll was {}. " +
"Could be caused by record retention or compaction and offset reset policy EARLIEST.",
bootstrapPolledOffset,
expectedBootstrapRecordOffset,
polledOffset
bootstrapPolledOffset
);

// reset
Expand Down Expand Up @@ -365,7 +370,7 @@ public Optional<OffsetAndMetadata> getCommitDataIfDirty() {
// visible for testing
protected OffsetAndMetadata createOffsetAndMetadata() {
Optional<String> payloadOpt = tryToEncodeOffsets();
long nextOffset = getNextExpectedInitialPolledOffset();
long nextOffset = getOffsetToCommit();
return payloadOpt
.map(encodedOffsets -> new OffsetAndMetadata(nextOffset, encodedOffsets))
.orElseGet(() -> new OffsetAndMetadata(nextOffset));
Expand All @@ -374,10 +379,10 @@ protected OffsetAndMetadata createOffsetAndMetadata() {
/**
* Next offset expected to be polled, upon freshly connecting to a broker.
* <p>
* Defines as the offset one below the highest sequentially succeeded offset.
* Defined as the offset, one below the highest sequentially succeeded offset.
*/
// visible for testing
protected long getNextExpectedInitialPolledOffset() {
protected long getOffsetToCommit() {
return getOffsetHighestSequentialSucceeded() + 1;
}

Expand Down Expand Up @@ -443,7 +448,7 @@ private Optional<String> tryToEncodeOffsets() {
try {
// todo refactor use of null shouldn't be needed. Is OffsetMapCodecManager stateful? remove null #233
OffsetMapCodecManager<K, V> om = new OffsetMapCodecManager<>(null);
long offsetOfNextExpectedMessage = getNextExpectedInitialPolledOffset();
long offsetOfNextExpectedMessage = getOffsetToCommit();
String offsetMapPayload = om.makeOffsetMetadataPayload(offsetOfNextExpectedMessage, this);
boolean mustStrip = updateBlockFromEncodingResult(offsetMapPayload);
if (mustStrip) {
Expand Down Expand Up @@ -523,20 +528,20 @@ private void maybeTruncateOrPruneTrackedOffsets(EpochAndRecordsMap<?, ?>.Records
return;
}

var low = getFirst(records).get().offset(); // NOSONAR see #isEmpty
var lowOffset = getFirst(records).get().offset(); // NOSONAR see #isEmpty

maybeTruncateBelowOrAbove(low);
maybeTruncateBelowOrAbove(lowOffset);

// build the hash set once, so we can do random access checks of our tracked incompletes
var polledOffsetLookup = records.stream()
.map(ConsumerRecord::offset)
.collect(Collectors.toSet());

var high = getLast(records).get().offset(); // NOSONAR see #isEmpty
var highOffset = getLast(records).get().offset(); // NOSONAR see #isEmpty

// for the incomplete offsets within this range of poll batch
var incompletesWithinPolledBatch = incompleteOffsets.keySet().subSet(low, true, high, true);
var offsetsToRemoveFromTracking = new ArrayList<Long>();
var incompletesWithinPolledBatch = incompleteOffsets.keySet().subSet(lowOffset, true, highOffset, true);
for (long incompleteOffset : incompletesWithinPolledBatch) {
boolean offsetMissingFromPolledRecords = !polledOffsetLookup.contains(incompleteOffset);

Expand All @@ -553,8 +558,8 @@ private void maybeTruncateOrPruneTrackedOffsets(EpochAndRecordsMap<?, ?>.Records
"base offset, after initial load and before a rebalance.",
offsetsToRemoveFromTracking,
getTp(),
low,
high
lowOffset,
highOffset
);
offsetsToRemoveFromTracking.forEach(incompleteOffsets::remove);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import io.confluent.csid.utils.ProgressBarUtils;
import io.confluent.csid.utils.ThreadUtils;
import io.confluent.parallelconsumer.FakeRuntimeException;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
Expand Down Expand Up @@ -257,7 +258,7 @@ private void testTiming(int numberOfKeys, int quantityOfMessagesToProduce) {
int i = Integer.parseInt(rec.value());
if (stepIndex != i) {
log.error("bad step: {} vs {}", stepIndex, i);
throw new RuntimeException("bad process step, expected message is missing: " + stepIndex + " vs " + i);
throw new FakeRuntimeException("bad process step, expected message is missing: " + stepIndex + " vs " + i);
}
stepIndex++;
}
Expand All @@ -284,7 +285,7 @@ private void testTiming(int numberOfKeys, int quantityOfMessagesToProduce) {
}
if (!missing.isEmpty())
log.error("Missing: {}", missing);
throw new RuntimeException("bad step, expected message(s) is missing: " + missing);
throw new FakeRuntimeException("bad step, expected message(s) is missing: " + missing);
}

assertThat(producerSpy.history()).as("Finally, all messages expected messages were produced").hasSize(quantityOfMessagesToProduce);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.FakeRuntimeException;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.offsets.OffsetSimultaneousEncoder;
Expand Down Expand Up @@ -157,7 +158,7 @@ private void runPcAndBlockRecordsOverLimitIndex(int blockOver) {
log.debug(msg("{} over block limit of {}, blocking...", index, blockOver));
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
throw new RuntimeException(e);
throw new FakeRuntimeException(e);
}
}
});
Expand Down
Loading