-
Notifications
You must be signed in to change notification settings - Fork 137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
major: Extend Consumer and Function for more cohesive API #303
Conversation
PriorityQueue only provides a sorted `poll`, whereas TreeSet iterates in sorted order.
…queue # Conflicts: # parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/truth/TruthGeneratorTests.java
LGTM - nice addition i think. |
…tation and makes usage more explicit
16b13c7
to
cc379ec
Compare
cc379ec
to
88e9f0e
Compare
|
||
/** | ||
* A user's processing function can throw this exception, which signals to PC that processing of the message has failed, | ||
* and that it should be retired at a later time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* and that it should be retired at a later time. | |
* and that it should be retried at a later time. |
* | ||
* @param records the Kafka records to process | ||
*/ | ||
void accept(PollContext records); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
void accept(PollContext records); | |
void accept(PollContext<K,V> records); |
* @return the function result | ||
*/ | ||
@Override | ||
R apply(PollContext t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
R apply(PollContext t); | |
R apply(PollContext<K,V> t); |
* @see PollConsumer#accept(PollContext) | ||
*/ | ||
@Override | ||
List<ProducerRecord<K, V>> apply(PollContext records); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
List<ProducerRecord<K, V>> apply(PollContext records); | |
List<ProducerRecord<K, V>> apply(PollContext<K,V> records); |
} | ||
|
||
@FunctionalInterface | ||
interface PollConsumerAndProducer<K, V> extends java.util.function.Function<PollContext<K, V>, List<ProducerRecord<K, V>>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Staying with PollConsumerAndProducer ? Would MessageTransformer or Processor or Mapper sound better given the messages in / out nature?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah - other thing is, if it affects your ideas - is it can produce many messages, not just one, from a single input
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be worth going to KStream DSL naming? have a map and flatmap where map takes 1 in - 1 out, and flatmap allows 1->many ? The typing needs fixed to allow transform of key / value anyway - so maybe can be done in separate PR that deals with it. Or Transform (with or without flatTransform).
Thinking about it - "Transformer" sounds best to me.
Are side effects allowed / expected in this scenario (i.e. external system calls for processing / enrichment)? If so something neutral like Processor would be ok i think.
pc.pollAndProduceMany(); | ||
pc.poll(records ->); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
presumably - to be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I was playing with what the api ended up looking like
…xp/merge-retry # Conflicts: # parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java # parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java # parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/EpochAndRecordsMap.java # parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetMapCodecManager.java # parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java # parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java # parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java # parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/RemovedPartitionState.java # parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java # parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java # parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java # parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorTestBase.java # parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/OffsetEncodingBackPressureTest.java # parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/OffsetEncodingBackPressureUnitTest.java # parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/OffsetEncodingTests.java # parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java # parallel-consumer-core/src/test/resources/logback-test.xml
…unctional # Conflicts: # parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelStreamProcessor.java # parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java # parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctions.java
Closing - Stale. |
Experiment: extend Consumer and Function - centralises documentation and makes usage more explicit
This is a diverged fork of PR #268 - the relevant changes start from the commit titled:
Experiment: extend Consumer and Function - centralises documentation and makes usage more explicit
Checklist
Blocked by: