Skip to content

Commit

Permalink
Add support for additional Kafka fetch options
Browse files Browse the repository at this point in the history
  • Loading branch information
mthssdrbrg committed Feb 7, 2015
1 parent fe5ee21 commit d1a8b0e
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 0 deletions.
6 changes: 6 additions & 0 deletions src/main/config/secor.common.properties
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ kafka.socket.receive.buffer.bytes=
# Kafka fetch max size (fetch.message.max.bytes)
kafka.fetch.message.max.bytes=

# Kafka fetch min bytes (fetch.fetch.min.bytes)
kafka.fetch.min.bytes=

# Kafka fetch max wait ms (fetch.max.wait.ms)
kafka.fetch.wait.max.ms=

# Port of the broker serving topic partition metadata.
kafka.seed.broker.port=9092

Expand Down
8 changes: 8 additions & 0 deletions src/main/java/com/pinterest/secor/common/SecorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ public String getSocketReceieveBufferBytes() {
return getString("kafka.socket.receive.buffer.bytes");
}

public String getFetchMinBytes() {
return getString("kafka.fetch.min.bytes");
}

public String getFetchWaitMaxMs() {
return getString("kafka.fetch.wait.max.ms");
}

public int getGeneration() {
return getInt("secor.generation");
}
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/pinterest/secor/reader/MessageReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ private ConsumerConfig createConsumerConfig() throws UnknownHostException {
if (mConfig.getFetchMessageMaxBytes() != null && !mConfig.getFetchMessageMaxBytes().isEmpty()) {
props.put("fetch.message.max.bytes", mConfig.getFetchMessageMaxBytes());
}
if (mConfig.getFetchMinBytes() != null && !mConfig.getFetchMinBytes().isEmpty()) {
props.put("fetch.min.bytes", mConfig.getFetchMinBytes());
}
if (mConfig.getFetchWaitMaxMs() != null && !mConfig.getFetchWaitMaxMs().isEmpty()) {
props.put("fetch.wait.max.ms", mConfig.getFetchWaitMaxMs());
}

return new ConsumerConfig(props);
}
Expand Down

0 comments on commit d1a8b0e

Please sign in to comment.