Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

major: Extend Consumer and Function for more cohesive API #303

Closed
wants to merge 54 commits into from

Conversation

astubbs
Copy link
Contributor

@astubbs astubbs commented May 13, 2022

Experiment: extend Consumer and Function - centralises documentation and makes usage more explicit

This is a diverged fork of PR #268 - the relevant changes start from the commit titled:
Experiment: extend Consumer and Function - centralises documentation and makes usage more explicit

Checklist

  • Documentation (if applicable)
  • Changelog

Blocked by:

@astubbs astubbs marked this pull request as draft May 13, 2022 12:39
@astubbs astubbs changed the base branch from master to features/retry-exception May 13, 2022 12:41
@astubbs astubbs changed the base branch from features/retry-exception to master May 13, 2022 12:41
@rkolesnev
Copy link
Contributor

LGTM - nice addition i think.

@astubbs astubbs force-pushed the features/extend-functional branch from 16b13c7 to cc379ec Compare May 13, 2022 13:39
@astubbs astubbs changed the base branch from master to features/retry-exception May 13, 2022 13:41
@astubbs astubbs changed the base branch from features/retry-exception to master May 13, 2022 13:41
@astubbs astubbs force-pushed the features/extend-functional branch from cc379ec to 88e9f0e Compare May 13, 2022 14:28

/**
* A user's processing function can throw this exception, which signals to PC that processing of the message has failed,
* and that it should be retired at a later time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* and that it should be retired at a later time.
* and that it should be retried at a later time.

*
* @param records the Kafka records to process
*/
void accept(PollContext records);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void accept(PollContext records);
void accept(PollContext<K,V> records);

* @return the function result
*/
@Override
R apply(PollContext t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
R apply(PollContext t);
R apply(PollContext<K,V> t);

* @see PollConsumer#accept(PollContext)
*/
@Override
List<ProducerRecord<K, V>> apply(PollContext records);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
List<ProducerRecord<K, V>> apply(PollContext records);
List<ProducerRecord<K, V>> apply(PollContext<K,V> records);

}

@FunctionalInterface
interface PollConsumerAndProducer<K, V> extends java.util.function.Function<PollContext<K, V>, List<ProducerRecord<K, V>>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Staying with PollConsumerAndProducer ? Would MessageTransformer or Processor or Mapper sound better given the messages in / out nature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah - other thing is, if it affects your ideas - is it can produce many messages, not just one, from a single input

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth going to KStream DSL naming? have a map and flatmap where map takes 1 in - 1 out, and flatmap allows 1->many ? The typing needs fixed to allow transform of key / value anyway - so maybe can be done in separate PR that deals with it. Or Transform (with or without flatTransform).
Thinking about it - "Transformer" sounds best to me.
Are side effects allowed / expected in this scenario (i.e. external system calls for processing / enrichment)? If so something neutral like Processor would be ok i think.

Comment on lines +144 to +146
pc.pollAndProduceMany();
pc.poll(records ->);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

presumably - to be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I was playing with what the api ended up looking like

@astubbs astubbs changed the title Experiment: extend Consumer and Function major: Extend Consumer and Function for more cohesive API May 16, 2022
@astubbs astubbs linked an issue Jul 15, 2022 that may be closed by this pull request
2 tasks
@astubbs astubbs self-assigned this Jul 15, 2022
…xp/merge-retry

# Conflicts:
#	parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java
#	parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java
#	parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/EpochAndRecordsMap.java
#	parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetMapCodecManager.java
#	parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java
#	parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java
#	parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java
#	parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/RemovedPartitionState.java
#	parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java
#	parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java
#	parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java
#	parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorTestBase.java
#	parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/OffsetEncodingBackPressureTest.java
#	parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/OffsetEncodingBackPressureUnitTest.java
#	parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/OffsetEncodingTests.java
#	parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java
#	parallel-consumer-core/src/test/resources/logback-test.xml
…unctional

# Conflicts:
#	parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelStreamProcessor.java
#	parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java
#	parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctions.java
@eddyv
Copy link
Member

eddyv commented Jun 15, 2023

Closing - Stale.

@eddyv eddyv closed this Jun 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants