Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NoSuchFieldException when using consumer inherited from KafkaConsumer #195

Closed
Tracked by #172
micr0farad opened this issue Feb 15, 2022 · 8 comments · Fixed by #469
Closed
Tracked by #172

NoSuchFieldException when using consumer inherited from KafkaConsumer #195

micr0farad opened this issue Feb 15, 2022 · 8 comments · Fixed by #469
Assignees
Milestone

Comments

@micr0farad
Copy link

micr0farad commented Feb 15, 2022

Issue description

Hey!
I'm trying to skip malformed messages by inheriting from KafkaConsumer and altering poll() method behavior.

class ExceptionHandlingKafkaConsumer<K, V> extends KafkaConsumer<K, V> { 
    public ExceptionHandlingKafkaConsumer(final Map<String, Object> configs) {
        super(configs);
    }

    @Override
    public ConsumerRecords<K, V> poll(Duration timeout) {
        try {
            return super.poll(timeout);
        }
        catch (Exception e) {
            if(!deserializationError(e)) {
                throw e;
            }
            this.seek(topicPartition, offset + 1); // partition and offset retrieved from exception instance
            return ConsumerRecords.empty();
        }
}

But then I get this error on startup:

java.lang.NoSuchFieldException: coordinator
	at java.base/java.lang.Class.getDeclaredField(Class.java:2411)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.checkAutoCommitIsDisabled(AbstractParallelEoSStreamProcessor.java:343)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.<init>(AbstractParallelEoSStreamProcessor.java:199)
	at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.<init>(ParallelEoSStreamProcessor.java:34)
	at io.confluent.parallelconsumer.ParallelStreamProcessor.createEosStreamProcessor(ParallelStreamProcessor.java:28)
	at com.avaya.ixo.configuration.kafka.consumer.MediaEventsReceiverConfig.mediaParallelConsumer(MediaEventsReceiverConfig.java:69)
	at com.avaya.ixo.configuration.kafka.consumer.MediaEventsReceiverConfig$$EnhancerBySpringCGLIB$$97d964ec.CGLIB$mediaParallelConsumer$1(<generated>)
	at com.avaya.ixo.configuration.kafka.consumer.MediaEventsReceiverConfig$$EnhancerBySpringCGLIB$$97d964ec$$FastClassBySpringCGLIB$$ff4101fd.invoke(<generated>)
	at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244)
	at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:331)
	at com.avaya.ixo.configuration.kafka.consumer.MediaEventsReceiverConfig$$EnhancerBySpringCGLIB$$97d964ec.mediaParallelConsumer(<generated>)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
	... 49 common frames omitted

Possible solutions

Workaround

Use composition over inheritance. So AbstractParallelEoSStreamProcessor will skip this check.

Proper fix

Probably implement auto.commit.enabled check without using reflection as there's no guarantee that consumer will be directly inherited from KafkaConsumer so we can't reliably use #getSuperclass() on it.

@astubbs astubbs mentioned this issue Mar 23, 2022
64 tasks
@astubbs
Copy link
Contributor

astubbs commented May 6, 2022

Probably implement auto.commit.enabled check without using reflection

I would looooove to do that, but when I implemented it, I couldn't think of any other way to make sure it's not enabled...

@astubbs
Copy link
Contributor

astubbs commented May 6, 2022

The root issue of this however - is what should PC do with deserialization errors? Why don't we just add some handling logic to PC itself?

Also - in your code, aren't you actually skipping over ALL records if /any/ have a deserialization issue?

@JorgenRingen how are you handling this issue?

@micr0farad
Copy link
Author

Also - in your code, aren't you actually skipping over ALL records if /any/ have a deserialization issue?

Yes, just skip and log the error

@JorgenRingen
Copy link
Contributor

JorgenRingen commented May 6, 2022

We use kafka-streams in front with avro, so it's not a practical issue for us as we control input topic. Our de-serde would also just log error and return null.

However, it would be really nice if this was handled by framework with sensible defaults. I kind of like the kafka-streams approach with default and customizable exception-handlers through config: https://docs.confluent.io/platform/current/installation/configuration/streams-configs.html#streamsconfigs_default.deserialization.exception.handler

It uses org.apache.kafka.streams.errors.LogAndFailExceptionHandler by default. There's also a org.apache.kafka.streams.errors.LogAndContinueExceptionHandler which I think makes more sense in most scenarios. And you can implement your own with all sorts of different logic (write to DLQ for example).

There's also "production-exception-handler" for serialization errors.

@JorgenRingen
Copy link
Contributor

Btw, why isn't this a core feature in kafka-consumer? It seems like the only option is to create a "non-throwing" serde or create some sort of seek-logic. Or to use kafka-spring, which seems like the most popular solution on SO 😆

@astubbs astubbs added this to the 0.5.1 milestone May 12, 2022
@astubbs
Copy link
Contributor

astubbs commented May 13, 2022

See #291 for implementation of SKIP and SHUTDOWN for user function failure - will be released in next feature release. Keen to get feedback!

We should open a separate issue for deserialization issues:

@bartman64
Copy link

I run into the same start-up error when using PC and quarkus-native build.

@astubbs
Copy link
Contributor

astubbs commented Nov 3, 2022

this.seek(topicPartition, offset + 1); // partition and offset retrieved from exception instance
BTW - this is incorrect - it assumes only a single record was polled from the broker

@astubbs astubbs self-assigned this Nov 3, 2022
astubbs added a commit that referenced this issue Nov 3, 2022
…afkaConsumer

Corrects use of reflection for field access, by using the KafkaConsumer class directly, instead of trying to find it from the provided consumer.
astubbs added a commit that referenced this issue Nov 21, 2022
…afkaConsumer (#469)

Corrects use of reflection for field access, by using the KafkaConsumer class directly, instead of trying to find it from the provided consumer.

* docs: Release instructions tweak
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants