Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow parallel consumer to work with subclasses of Apache KafkaConsumer #564

Closed
mrabhishek opened this issue Mar 22, 2023 · 2 comments
Closed

Comments

@mrabhishek
Copy link

mrabhishek commented Mar 22, 2023

I do extend the stock Apache KafkaConsumer class to add some additional behavior to its existing poll method. However, since the parallel-consumer expects to read private fields from the KafkaConsumer class, it does not work with an instance of my subclass provided as a consumer.

The error is specifically at this line.

if (consumer instanceof KafkaConsumer) {
     // Could use Commons Lang FieldUtils#readField - but, avoid needing commons lang
     Field coordinatorField = KafkaConsumer.class.getDeclaredField("coordinator");

If we modified this check to add looking into current and its parent class(es), things will work fine. Let me know if that is an acceptable fix and I'll push a PR.

For details - My Consumer looks something like this.

public class CustomKafkaConsumer<K, V> extends KafkaConsumer<K, V> {
}

and I want to use this with parallel consumer like this.

ParallelConsumerOptions.<String, byte[]>builder()
                .consumer(new CustomKafkaConsumer())
                .maxConcurrency(2)
                .ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED)
                .commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC)
                .build();
@mrabhishek
Copy link
Author

mrabhishek commented Mar 24, 2023

This seems to have been fixed in 0.5.2.5 as part of #195 . Closing as duplicate.

@Kirikx
Copy link

Kirikx commented Jun 5, 2023

@mrabhishek No, the problem has not been resolved. I get a similar problem when using spring boot kafka (PC version: 0.5.2.5):
c.p.i.AbstractParallelEoSStreamProcessor : Consumer is neither a KafkaConsumer nor a MockConsumer - cannot check auto commit is disabled for consumer type: brave.kafka.clients.TracingConsumer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants