Skip to content

Commit

Permalink
perf(server): leave consumer group on shutdown by default (#1151)
Browse files Browse the repository at this point in the history
There is an undocumented and unfortunate behavior in Kafka Streams such that if:
- One instance joins a group
- Another instance calls KafkaStreams#close() before the Rebalance is over
- then the close() call will block until max.poll.interval.ms

That is really not good. However, the KS community says they have no plans to fix it.

One interesting thing though is that if we send the 'LeaveGroupRequest' then close() returns immediately (as expected). This PR sets the default of the LHS_X_LEAVE_GROUP_ON_SHUTDOWN config to true, so that by default the LH Server Streams Topologies will send a LeaveGroupRequest, and allow for clean shutdowns.

We still leave the configuration there for adventurous users who want to play with it.
  • Loading branch information
coltmcnealy-lh authored Nov 21, 2024
1 parent 7451c28 commit 16db9ec
Showing 1 changed file with 5 additions and 10 deletions.
15 changes: 5 additions & 10 deletions server/src/main/java/io/littlehorse/common/LHServerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -768,14 +768,6 @@ private String getFromConfigOrFile(String primary, String fileLocation, String d
return defaultVal;
}

/*
* EXPERIMENTAL: Internal config to determine whether the server should leave the
* group on shutdown
*/
public boolean leaveGroupOnShutdown() {
return getOrSetDefault(X_LEAVE_GROUP_ON_SHUTDOWN_KEY, "false").equals("true");
}

public Properties getCoreStreamsConfig() {
Properties result = getBaseStreamsConfig();
result.put("application.id", getKafkaGroupId("core"));
Expand Down Expand Up @@ -886,8 +878,11 @@ private Properties getBaseStreamsConfig() {
props.put(
StreamsConfig.adminClientPrefix(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG), "rebootstrap");

if (getOrSetDefault(X_LEAVE_GROUP_ON_SHUTDOWN_KEY, "false").equals("true")) {
log.warn("Using experimental internal config to leave group on shutdonw!");
if (getOrSetDefault(X_LEAVE_GROUP_ON_SHUTDOWN_KEY, "true").equalsIgnoreCase("false")) {
log.warn(
"Using experimental internal config LHS_X_LEAVE_GROUP_ON_SHUTDOWN to NOT leave group on shutdown!");
props.put(StreamsConfig.consumerPrefix("internal.leave.group.on.close"), false);
} else {
props.put(StreamsConfig.consumerPrefix("internal.leave.group.on.close"), true);
}

Expand Down

0 comments on commit 16db9ec

Please sign in to comment.