Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes: #195 NoSuchFieldException when using consumer inherited from KafkaConsumer #469

Merged
merged 7 commits into from
Nov 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ ifndef::github_name[]
toc::[]
endif::[]

== 0.5.2.5

=== Fixes

* fixes: #195 NoSuchFieldException when using consumer inherited from KafkaConsumer (#469)

== 0.5.2.4

=== Improvements
Expand Down
6 changes: 6 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,12 @@ ifndef::github_name[]
toc::[]
endif::[]

== 0.5.2.5

=== Fixes

* fixes: #195 NoSuchFieldException when using consumer inherited from KafkaConsumer (#469)

== 0.5.2.4

=== Improvements
Expand Down
8 changes: 5 additions & 3 deletions RELEASE.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@

`release:prepare -DautoVersionSubmodules=true -DpushChanges=false -Darguments=-DskipTests -Pci`

- Push the plugin's commits one at a time (otherwise need to request Jenkins to build the release tag - so this is a minor shortcut)
- Push the master branch with release and tag
- Trigger master builder to build the tag (this is needed to trigger the deployment flow)
- Wait for Jenkins to finish running the build (~15 minutes)
- Wait for Sonatype to publish from it's staging area (~15 minutes)
- Wait for Sonatype to publish from it's staging area (~15 minutes) https://repo1.maven.org/maven2/io/confluent/parallelconsumer/parallel-consumer-parent/[repo1 link]
- Verify the release is available on Maven Central https://repo1.maven.org/maven2/io/confluent/parallelconsumer/parallel-consumer-parent/[repo1 link]
- Create the release on GH from the tag
- Paste in the details from the changelog, save, share as discussion
- Announce on slack(s), mailing list,
- Announce on slack (community #clients and internal channels), mailing list, twitter
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import static io.confluent.csid.utils.BackportUtils.toSeconds;
import static io.confluent.csid.utils.StringUtils.msg;
import static io.confluent.parallelconsumer.internal.State.*;
import static java.lang.Boolean.TRUE;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static lombok.AccessLevel.PRIVATE;
Expand Down Expand Up @@ -406,28 +407,34 @@ public void onPartitionsLost(Collection<TopicPartition> partitions) {
* Nasty reflection to check if auto commit is disabled.
* <p>
* Other way would be to politely request the user also include their consumer properties when construction, but
* this is more reliable in a correctness sense, but britle in terms of coupling to internal implementation.
* this is more reliable in a correctness sense, but brittle in terms of coupling to internal implementation.
* Consider requesting ability to inspect configuration at runtime.
*/
@SneakyThrows
private void checkAutoCommitIsDisabled(org.apache.kafka.clients.consumer.Consumer<K, V> consumer) {
if (consumer instanceof KafkaConsumer) {
// Commons lang FieldUtils#readField - avoid needing commons lang
Field coordinatorField = consumer.getClass().getDeclaredField("coordinator"); //NoSuchFieldException
coordinatorField.setAccessible(true);
ConsumerCoordinator coordinator = (ConsumerCoordinator) coordinatorField.get(consumer); //IllegalAccessException

if (coordinator == null)
throw new IllegalStateException("Coordinator for Consumer is null - missing GroupId? Reflection broken?");

Field autoCommitEnabledField = coordinator.getClass().getDeclaredField("autoCommitEnabled");
autoCommitEnabledField.setAccessible(true);
Boolean isAutoCommitEnabled = (Boolean) autoCommitEnabledField.get(coordinator);

if (isAutoCommitEnabled)
throw new IllegalArgumentException("Consumer auto commit must be disabled, as commits are handled by the library.");
} else {
// noop - probably MockConsumer being used in testing - which doesn't do auto commits
try {
if (consumer instanceof KafkaConsumer) {
// Could use Commons Lang FieldUtils#readField - but, avoid needing commons lang
Field coordinatorField = KafkaConsumer.class.getDeclaredField("coordinator");
coordinatorField.setAccessible(true);
ConsumerCoordinator coordinator = (ConsumerCoordinator) coordinatorField.get(consumer); //IllegalAccessException

if (coordinator == null)
throw new IllegalStateException("Coordinator for Consumer is null - missing GroupId? Reflection broken?");

Field autoCommitEnabledField = coordinator.getClass().getDeclaredField("autoCommitEnabled");
autoCommitEnabledField.setAccessible(true);
Boolean isAutoCommitEnabled = (Boolean) autoCommitEnabledField.get(coordinator);

if (TRUE.equals(isAutoCommitEnabled))
throw new ParallelConsumerException("Consumer auto commit must be disabled, as commits are handled by the library.");
} else if (consumer instanceof MockConsumer) {
log.debug("Detected MockConsumer class which doesn't do auto commits");
} else {
// Probably Mockito
log.error("Consumer is neither a KafkaConsumer nor a MockConsumer - cannot check auto commit is disabled for consumer type: " + consumer.getClass().getName());
}
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new IllegalStateException("Cannot check auto commit is disabled for consumer type: " + consumer.getClass().getName(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.confluent.parallelconsumer.integrationTests;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.Test;

import java.util.Properties;

/**
* @author Antony Stubbs
*/
class CustomConsumersTest extends BrokerIntegrationTest {

/**
* Tests that extended consumer can be used with a custom consumer with PC.
* <p>
* Test for issue #195 - https://github.com/confluentinc/parallel-consumer/issues/195
*
* @see io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor#checkAutoCommitIsDisabled
*/
@Test
void extendedConsumer() { // NOSONAR
Properties properties = getKcu().setupConsumerProps(this.getClass().getSimpleName());
CustomConsumer<String, String> client = new CustomConsumer<>(properties);

ParallelConsumerOptions<String, String> options = ParallelConsumerOptions.<String, String>builder()
.consumer(client)
.build();

ParallelEoSStreamProcessor<String, String> pc = new ParallelEoSStreamProcessor<>(options);
}

static class CustomConsumer<K, V> extends KafkaConsumer<K, V> {

String customField = "custom";

public CustomConsumer(Properties configs) {
super(configs);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private Properties setupProducerProps() {
return producerProps;
}

private Properties setupConsumerProps(String groupIdToUse) {
public Properties setupConsumerProps(String groupIdToUse) {
var consumerProps = setupCommonProps();

//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,15 +823,15 @@ void optionsGroupIdRequiredAndAutoCommitDisabled() {
optionsBuilder.consumer(new KafkaConsumer<>(properties, deserializer, deserializer));
assertThat(catchThrowable(() -> parallelConsumer = initPollingAsyncConsumer(optionsBuilder.build())))
.as("Should error on auto commit enabled by default")
.isInstanceOf(IllegalArgumentException.class)
.isInstanceOf(ParallelConsumerException.class)
.hasMessageContainingAll("auto", "commit", "disabled");

// fail auto commit disabled
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
optionsBuilder.consumer(new KafkaConsumer<>(properties, deserializer, deserializer));
assertThat(catchThrowable(() -> parallelConsumer = initPollingAsyncConsumer(optionsBuilder.build())))
.as("Should error on auto commit enabled")
.isInstanceOf(IllegalArgumentException.class)
.isInstanceOf(ParallelConsumerException.class)
.hasMessageContainingAll("auto", "commit", "disabled");

// set missing auto commit
Expand Down