Skip to content

Commit

Permalink
fixes: #195 NoSuchFieldException when using consumer inherited from K…
Browse files Browse the repository at this point in the history
…afkaConsumer

Corrects use of reflection for field access, by using the KafkaConsumer class directly, instead of trying to find it from the provided consumer.
  • Loading branch information
astubbs committed Nov 3, 2022
1 parent 585b584 commit 95ee9b1
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -406,28 +406,32 @@ public void onPartitionsLost(Collection<TopicPartition> partitions) {
* Nasty reflection to check if auto commit is disabled.
* <p>
* Other way would be to politely request the user also include their consumer properties when construction, but
* this is more reliable in a correctness sense, but britle in terms of coupling to internal implementation.
* this is more reliable in a correctness sense, but brittle in terms of coupling to internal implementation.
* Consider requesting ability to inspect configuration at runtime.
*/
@SneakyThrows
private void checkAutoCommitIsDisabled(org.apache.kafka.clients.consumer.Consumer<K, V> consumer) {
if (consumer instanceof KafkaConsumer) {
// Commons lang FieldUtils#readField - avoid needing commons lang
Field coordinatorField = consumer.getClass().getDeclaredField("coordinator"); //NoSuchFieldException
coordinatorField.setAccessible(true);
ConsumerCoordinator coordinator = (ConsumerCoordinator) coordinatorField.get(consumer); //IllegalAccessException

if (coordinator == null)
throw new IllegalStateException("Coordinator for Consumer is null - missing GroupId? Reflection broken?");

Field autoCommitEnabledField = coordinator.getClass().getDeclaredField("autoCommitEnabled");
autoCommitEnabledField.setAccessible(true);
Boolean isAutoCommitEnabled = (Boolean) autoCommitEnabledField.get(coordinator);

if (isAutoCommitEnabled)
throw new IllegalArgumentException("Consumer auto commit must be disabled, as commits are handled by the library.");
} else {
// noop - probably MockConsumer being used in testing - which doesn't do auto commits
try {
if (consumer instanceof KafkaConsumer) {
Field coordinatorField = KafkaConsumer.class.getDeclaredField("coordinator");
coordinatorField.setAccessible(true);
ConsumerCoordinator coordinator = (ConsumerCoordinator) coordinatorField.get(consumer); //IllegalAccessException

if (coordinator == null)
throw new IllegalStateException("Coordinator for Consumer is null - missing GroupId? Reflection broken?");

Field autoCommitEnabledField = coordinator.getClass().getDeclaredField("autoCommitEnabled");
autoCommitEnabledField.setAccessible(true);
Boolean isAutoCommitEnabled = (Boolean) autoCommitEnabledField.get(coordinator);

if (isAutoCommitEnabled)
throw new ParallelConsumerException("Consumer auto commit must be disabled, as commits are handled by the library.");
} else if (consumer instanceof MockConsumer<K, V>) {
log.debug("Detected MockConsumer class which doesn't do auto commits");
} else {
throw new UnsupportedOperationException("Consumer is neither a KafkaConsumer nor a MockConsumer - cannot check auto commit is disabled for consumer type: " + consumer.getClass().getName());
}
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new IllegalStateException("Cannot check auto commit is disabled for consumer type: " + consumer.getClass().getName(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.confluent.parallelconsumer.integrationTests;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.Test;

import java.util.Properties;

/**
* @author Antony Stubbs
*/
class CustomConsumersTest extends BrokerIntegrationTest {

/**
* Tests that extended consumer can be used with a custom consumer with PC.
* <p>
* Test for issue #195 - https://github.com/confluentinc/parallel-consumer/issues/195
*
* @see io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor#checkAutoCommitIsDisabled
*/
@Test
void extendedConsumer() { // NOSONAR
Properties properties = getKcu().setupConsumerProps(this.getClass().getSimpleName());
CustomConsumer<String, String> client = new CustomConsumer<>(properties);

ParallelConsumerOptions<String, String> options = ParallelConsumerOptions.<String, String>builder()
.consumer(client)
.build();

ParallelEoSStreamProcessor<String, String> pc = new ParallelEoSStreamProcessor<>(options);
}

static class CustomConsumer<K, V> extends KafkaConsumer<K, V> {

String customField = "custom";

public CustomConsumer(Properties configs) {
super(configs);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private Properties setupProducerProps() {
return producerProps;
}

private Properties setupConsumerProps(String groupIdToUse) {
public Properties setupConsumerProps(String groupIdToUse) {
var consumerProps = setupCommonProps();

//
Expand Down

0 comments on commit 95ee9b1

Please sign in to comment.