Skip to content

Commit

Permalink
Fix consumer timeout after broker restart
Browse files Browse the repository at this point in the history
  • Loading branch information
wushilin authored Jul 25, 2024
1 parent ab277c9 commit 68c0d0c
Showing 1 changed file with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,20 @@ protected void commitOffsets(final Map<TopicPartition, OffsetAndMetadata> offset
switch (commitMode) {
case PERIODIC_CONSUMER_SYNC -> {
log.debug("Committing offsets Sync");
consumerMgr.commitSync(offsetsToSend);
while(true) {
try {
consumerMgr.commitSync(offsetsToSend);
//break when offset commit is okay, never throw exception to the main threads
break;
} catch(Throwable t) {
log.error("Failed to commit offset. Retrying in 10 seconds", t);
try {
Thread.sleep(10000L);
} catch(InterruptedException ite) {
log.info("Giving up offset commit due to interruption");
}
}
}
}
case PERIODIC_CONSUMER_ASYNCHRONOUS -> {
//
Expand Down

0 comments on commit 68c0d0c

Please sign in to comment.