diff --git a/src/main/config/secor.common.properties b/src/main/config/secor.common.properties index 3821d071f..cafa86fcb 100644 --- a/src/main/config/secor.common.properties +++ b/src/main/config/secor.common.properties @@ -47,6 +47,12 @@ kafka.socket.receive.buffer.bytes= # Kafka fetch max size (fetch.message.max.bytes) kafka.fetch.message.max.bytes= +# Kafka fetch min bytes (fetch.fetch.min.bytes) +kafka.fetch.min.bytes= + +# Kafka fetch max wait ms (fetch.max.wait.ms) +kafka.fetch.wait.max.ms= + # Port of the broker serving topic partition metadata. kafka.seed.broker.port=9092 diff --git a/src/main/java/com/pinterest/secor/common/SecorConfig.java b/src/main/java/com/pinterest/secor/common/SecorConfig.java index 33e6a7a57..2cb7c3b9d 100644 --- a/src/main/java/com/pinterest/secor/common/SecorConfig.java +++ b/src/main/java/com/pinterest/secor/common/SecorConfig.java @@ -103,6 +103,14 @@ public String getSocketReceieveBufferBytes() { return getString("kafka.socket.receive.buffer.bytes"); } + public String getFetchMinBytes() { + return getString("kafka.fetch.min.bytes"); + } + + public String getFetchWaitMaxMs() { + return getString("kafka.fetch.wait.max.ms"); + } + public int getGeneration() { return getInt("secor.generation"); } diff --git a/src/main/java/com/pinterest/secor/reader/MessageReader.java b/src/main/java/com/pinterest/secor/reader/MessageReader.java index c24d9ef4b..12ccc22ff 100644 --- a/src/main/java/com/pinterest/secor/reader/MessageReader.java +++ b/src/main/java/com/pinterest/secor/reader/MessageReader.java @@ -125,6 +125,12 @@ private ConsumerConfig createConsumerConfig() throws UnknownHostException { if (mConfig.getFetchMessageMaxBytes() != null && !mConfig.getFetchMessageMaxBytes().isEmpty()) { props.put("fetch.message.max.bytes", mConfig.getFetchMessageMaxBytes()); } + if (mConfig.getFetchMinBytes() != null && !mConfig.getFetchMinBytes().isEmpty()) { + props.put("fetch.min.bytes", mConfig.getFetchMinBytes()); + } + if (mConfig.getFetchWaitMaxMs() != null && !mConfig.getFetchWaitMaxMs().isEmpty()) { + props.put("fetch.wait.max.ms", mConfig.getFetchWaitMaxMs()); + } return new ConsumerConfig(props); }