From b17eb7edae1f0a1aabccfde6a316f34c0d1ad8d2 Mon Sep 17 00:00:00 2001 From: Mark Cox Date: Thu, 17 Sep 2020 11:43:33 -0700 Subject: [PATCH] #748 - Allow config consumer.max.poll.records --- .../java/io/confluent/kafkarest/KafkaRestConfig.java | 9 ++++++++- .../io/confluent/kafkarest/v2/KafkaConsumerManager.java | 6 ++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/kafka-rest-common/src/main/java/io/confluent/kafkarest/KafkaRestConfig.java b/kafka-rest-common/src/main/java/io/confluent/kafkarest/KafkaRestConfig.java index 4554fb1b81..523aaf27cc 100644 --- a/kafka-rest-common/src/main/java/io/confluent/kafkarest/KafkaRestConfig.java +++ b/kafka-rest-common/src/main/java/io/confluent/kafkarest/KafkaRestConfig.java @@ -57,7 +57,7 @@ public class KafkaRestConfig extends RestConfig { private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll()."; // ensures poll is frequently needed and called - public static final String MAX_POLL_RECORDS_VALUE = "30"; + public static final String MAX_POLL_RECORDS_DEFAULT = "30"; public static final String HOST_NAME_CONFIG = "host.name"; private static final String HOST_NAME_DOC = @@ -371,6 +371,13 @@ protected static ConfigDef baseKafkaRestConfigDef() { Importance.MEDIUM, CONSUMER_MAX_THREADS_DOC ) + .define( + MAX_POLL_RECORDS_CONFIG, + Type.INT, + MAX_POLL_RECORDS_DEFAULT, + Importance.MEDIUM, + MAX_POLL_RECORDS_DOC + ) .define( ZOOKEEPER_CONNECT_CONFIG, Type.STRING, diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/v2/KafkaConsumerManager.java b/kafka-rest/src/main/java/io/confluent/kafkarest/v2/KafkaConsumerManager.java index 6664e9596c..87ba1e76fc 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/v2/KafkaConsumerManager.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/v2/KafkaConsumerManager.java @@ -17,7 +17,6 @@ import static io.confluent.kafkarest.KafkaRestConfig.CONSUMER_MAX_THREADS_CONFIG; import static io.confluent.kafkarest.KafkaRestConfig.MAX_POLL_RECORDS_CONFIG; -import static io.confluent.kafkarest.KafkaRestConfig.MAX_POLL_RECORDS_VALUE; import io.confluent.kafkarest.ConsumerInstanceId; import io.confluent.kafkarest.ConsumerReadCallback; @@ -201,7 +200,10 @@ public String createConsumer(String group, ConsumerInstanceConfig instanceConfig //Properties props = (Properties) config.getOriginalProperties().clone(); Properties props = config.getConsumerProperties(); props.setProperty(KafkaRestConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); - props.setProperty(MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS_VALUE); + if (props.getProperty(MAX_POLL_RECORDS_CONFIG) == null) { + props.setProperty(MAX_POLL_RECORDS_CONFIG, + config.getInt(MAX_POLL_RECORDS_CONFIG).toString()); + } props.setProperty("group.id", group); // This ID we pass here has to be unique, only pass a value along if the deprecated ID field // was passed in. This generally shouldn't be used, but is maintained for compatibility.