-
Notifications
You must be signed in to change notification settings - Fork 139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
major: Extend Consumer and Function for more cohesive API #303
Closed
Closed
Changes from all commits
Commits
Show all changes
54 commits
Select commit
Hold shift + click to select a range
21f08f6
START: the basics of a single queue
astubbs 3f1f3ff
step: remove work mailbox manager
astubbs 92b50cb
step: BROKEN: assign epoch to record immediately
astubbs 1653bc2
step - trying to test perf
astubbs 103c677
update
astubbs eed2190
logs
astubbs 02ee3cd
fix: Debug output for sorted encoding pairs
astubbs 83fda73
save
astubbs 2a37d46
rebase update
astubbs b17c838
step
astubbs e1141e4
save
astubbs 5a3bb55
save
astubbs 6a1464c
save: unit test version of offset encoding backpressure test
astubbs 0604934
save
astubbs 4eeb008
omg - hashsets vs queues, wow
astubbs bcfc9c1
review
astubbs 6054ac5
review
astubbs c968629
review
astubbs 0f993dd
review
astubbs 416fd2f
Merge remote-tracking branch 'confluent/master' into features/single-…
astubbs 189dc59
step
astubbs 908d8ed
step
astubbs 7547ec6
fix test
astubbs 939a15e
step - test fix?
astubbs 3fa6ae3
step - test fix?
astubbs c44f50a
step - test fix? - make sure unit test also cleans up
astubbs eff0b13
step - test fix? - make sure unit test also cleans up
astubbs a6dc167
START: Explicit retry exception for cleaner logging
astubbs 74e0efb
step: reduce consumer max poll
astubbs ae1ce22
step: loosen duplicate check a bit for jenkins
astubbs 62ffa63
step: fix generics
astubbs bf4452e
step: Experiment: synchronisation no longer needed due to stronger ep…
astubbs 94ebc5c
turn max poll back to default (500)
astubbs 1e8fcd9
license
astubbs b85fd2d
review
astubbs c6056fe
review
astubbs 333ccac
review
astubbs d751aa4
review
astubbs aa5c0e1
fix
astubbs 3b51ffe
START: Rename PartitionMonitor to PartitionStateManager
astubbs 1b087e5
Merge branch 'features/single-queue' into features/retry-exception
astubbs 88e9f0e
START: Experiment: extend Consumer and Function - centralises documen…
astubbs 6862a56
step
astubbs 14f1d47
refactor #409: Clarify truncation code
astubbs 24c61f1
refactor #409: Clarify truncation code - tests passing
astubbs 62be60b
refactor #409: Clarify truncation code - tests passing
astubbs 2122e9e
bump jabel for j19
astubbs 0131d35
cleanup
astubbs 158ca87
fix
astubbs a66b673
Merge remote-tracking branch 'origin/features/retry-exception' into e…
astubbs 40ba4e3
merge fix
astubbs f97c3cb
migrate
astubbs 8a996af
Merge branch 'features/retry-exception-simple' into features/extend-f…
astubbs f831137
step
astubbs File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
23 changes: 23 additions & 0 deletions
23
parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCRetriableException.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
package io.confluent.parallelconsumer; | ||
|
||
/*- | ||
* Copyright (C) 2020-2022 Confluent, Inc. | ||
*/ | ||
|
||
import lombok.experimental.StandardException; | ||
|
||
/** | ||
* A user's processing function can throw this exception, which signals to PC that processing of the message has failed, | ||
* and that it should be retired at a later time. | ||
* <p> | ||
* The advantage of throwing this exception explicitly, is that PC will not log an ERROR. If any other type of exception | ||
* is thrown by the user's function, that will be logged as an error (but will still be retried later). | ||
* <p> | ||
* So in short, if this exception is thrown, nothing will be logged (except at DEBUG level), any other exception will be | ||
* logged as an error. | ||
* | ||
* @author Antony Stubbs | ||
*/ | ||
@StandardException | ||
public class PCRetriableException extends RuntimeException { | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
50 changes: 50 additions & 0 deletions
50
parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/UserFunctions.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
package io.confluent.parallelconsumer; | ||
|
||
import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
import org.apache.kafka.clients.producer.ProducerRecord; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* Types of user functions used for processing records. | ||
* | ||
* @author Antony Stubbs | ||
*/ | ||
public interface UserFunctions { | ||
|
||
/** | ||
* Process a Kafka {@link ConsumerRecord} via {@link PollContext} instances. | ||
*/ | ||
@FunctionalInterface | ||
interface Processor<K, V> {// extends java.util.function.Consumer<PollContext<K, V>> { | ||
|
||
/** | ||
* Process a Kafka {@link ConsumerRecord} via {@link PollContext} instances. | ||
* <p> | ||
* User can throw a {@link PCRetriableException}, if an issue is and PC should handle the process of retrying it | ||
* later. If an exception is thrown that doesn't extend {@link PCRetriableException}, the error will be logged | ||
* at {@code WARN} level. Note that, by default, any exception thrown from a users function will cause the | ||
* record to be retried, as if a {@link PCRetriableException} had actually been thrown. | ||
* | ||
* @param records the Kafka records to process | ||
* @see PCRetriableException | ||
* @see ParallelConsumerOptions#getRetryDelayProvider() | ||
* @see ParallelConsumerOptions#getDefaultMessageRetryDelay() | ||
*/ | ||
void process(PollContext<K, V> records); | ||
} | ||
|
||
@FunctionalInterface | ||
interface Transformer<K, V> { //extends java.util.function.Function<PollContext<K, V>, List<ProducerRecord<K, V>>> { | ||
|
||
/** | ||
* Like {@link Processor#process(PollContext)} but also returns records to be produced back to Kafka. | ||
* | ||
* @param records the Kafka records to process | ||
* @return the function result | ||
* @see Processor#process(PollContext) | ||
*/ | ||
List<ProducerRecord<K, V>> flatMap(PollContext<K, V> records); | ||
|
||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.